Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/utils.py: 22%
246 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1import io
2import mimetypes
3import os
4import pkgutil
5import re
6import sys
7import typing as t
8import unicodedata
9from datetime import datetime
10from time import time
11from zlib import adler32
13from markupsafe import escape
15from ._internal import _DictAccessorProperty
16from ._internal import _missing
17from ._internal import _TAccessorValue
18from .datastructures import Headers
19from .exceptions import NotFound
20from .exceptions import RequestedRangeNotSatisfiable
21from .security import safe_join
22from .urls import url_quote
23from .wsgi import wrap_file
25if t.TYPE_CHECKING:
26 from _typeshed.wsgi import WSGIEnvironment
27 from .wrappers.request import Request
28 from .wrappers.response import Response
30_T = t.TypeVar("_T")
32_entity_re = re.compile(r"&([^;]+);")
33_filename_ascii_strip_re = re.compile(r"[^A-Za-z0-9_.-]")
34_windows_device_files = (
35 "CON",
36 "AUX",
37 "COM1",
38 "COM2",
39 "COM3",
40 "COM4",
41 "LPT1",
42 "LPT2",
43 "LPT3",
44 "PRN",
45 "NUL",
46)
49class cached_property(property, t.Generic[_T]):
50 """A :func:`property` that is only evaluated once. Subsequent access
51 returns the cached value. Setting the property sets the cached
52 value. Deleting the property clears the cached value, accessing it
53 again will evaluate it again.
55 .. code-block:: python
57 class Example:
58 @cached_property
59 def value(self):
60 # calculate something important here
61 return 42
63 e = Example()
64 e.value # evaluates
65 e.value # uses cache
66 e.value = 16 # sets cache
67 del e.value # clears cache
69 If the class defines ``__slots__``, it must add ``_cache_{name}`` as
70 a slot. Alternatively, it can add ``__dict__``, but that's usually
71 not desirable.
73 .. versionchanged:: 2.1
74 Works with ``__slots__``.
76 .. versionchanged:: 2.0
77 ``del obj.name`` clears the cached value.
78 """
80 def __init__(
81 self,
82 fget: t.Callable[[t.Any], _T],
83 name: t.Optional[str] = None,
84 doc: t.Optional[str] = None,
85 ) -> None:
86 super().__init__(fget, doc=doc)
87 self.__name__ = name or fget.__name__
88 self.slot_name = f"_cache_{self.__name__}"
89 self.__module__ = fget.__module__
91 def __set__(self, obj: object, value: _T) -> None:
92 if hasattr(obj, "__dict__"):
93 obj.__dict__[self.__name__] = value
94 else:
95 setattr(obj, self.slot_name, value)
97 def __get__(self, obj: object, type: type = None) -> _T: # type: ignore
98 if obj is None:
99 return self # type: ignore
101 obj_dict = getattr(obj, "__dict__", None)
103 if obj_dict is not None:
104 value: _T = obj_dict.get(self.__name__, _missing)
105 else:
106 value = getattr(obj, self.slot_name, _missing) # type: ignore[arg-type]
108 if value is _missing:
109 value = self.fget(obj) # type: ignore
111 if obj_dict is not None:
112 obj.__dict__[self.__name__] = value
113 else:
114 setattr(obj, self.slot_name, value)
116 return value
118 def __delete__(self, obj: object) -> None:
119 if hasattr(obj, "__dict__"):
120 del obj.__dict__[self.__name__]
121 else:
122 setattr(obj, self.slot_name, _missing)
125class environ_property(_DictAccessorProperty[_TAccessorValue]):
126 """Maps request attributes to environment variables. This works not only
127 for the Werkzeug request object, but also any other class with an
128 environ attribute:
130 >>> class Test(object):
131 ... environ = {'key': 'value'}
132 ... test = environ_property('key')
133 >>> var = Test()
134 >>> var.test
135 'value'
137 If you pass it a second value it's used as default if the key does not
138 exist, the third one can be a converter that takes a value and converts
139 it. If it raises :exc:`ValueError` or :exc:`TypeError` the default value
140 is used. If no default value is provided `None` is used.
142 Per default the property is read only. You have to explicitly enable it
143 by passing ``read_only=False`` to the constructor.
144 """
146 read_only = True
148 def lookup(self, obj: "Request") -> "WSGIEnvironment":
149 return obj.environ
152class header_property(_DictAccessorProperty[_TAccessorValue]):
153 """Like `environ_property` but for headers."""
155 def lookup(self, obj: t.Union["Request", "Response"]) -> Headers:
156 return obj.headers
159# https://cgit.freedesktop.org/xdg/shared-mime-info/tree/freedesktop.org.xml.in
160# https://www.iana.org/assignments/media-types/media-types.xhtml
161# Types listed in the XDG mime info that have a charset in the IANA registration.
162_charset_mimetypes = {
163 "application/ecmascript",
164 "application/javascript",
165 "application/sql",
166 "application/xml",
167 "application/xml-dtd",
168 "application/xml-external-parsed-entity",
169}
172def get_content_type(mimetype: str, charset: str) -> str:
173 """Returns the full content type string with charset for a mimetype.
175 If the mimetype represents text, the charset parameter will be
176 appended, otherwise the mimetype is returned unchanged.
178 :param mimetype: The mimetype to be used as content type.
179 :param charset: The charset to be appended for text mimetypes.
180 :return: The content type.
182 .. versionchanged:: 0.15
183 Any type that ends with ``+xml`` gets a charset, not just those
184 that start with ``application/``. Known text types such as
185 ``application/javascript`` are also given charsets.
186 """
187 if (
188 mimetype.startswith("text/")
189 or mimetype in _charset_mimetypes
190 or mimetype.endswith("+xml")
191 ):
192 mimetype += f"; charset={charset}"
194 return mimetype
197def secure_filename(filename: str) -> str:
198 r"""Pass it a filename and it will return a secure version of it. This
199 filename can then safely be stored on a regular file system and passed
200 to :func:`os.path.join`. The filename returned is an ASCII only string
201 for maximum portability.
203 On windows systems the function also makes sure that the file is not
204 named after one of the special device files.
206 >>> secure_filename("My cool movie.mov")
207 'My_cool_movie.mov'
208 >>> secure_filename("../../../etc/passwd")
209 'etc_passwd'
210 >>> secure_filename('i contain cool \xfcml\xe4uts.txt')
211 'i_contain_cool_umlauts.txt'
213 The function might return an empty filename. It's your responsibility
214 to ensure that the filename is unique and that you abort or
215 generate a random filename if the function returned an empty one.
217 .. versionadded:: 0.5
219 :param filename: the filename to secure
220 """
221 filename = unicodedata.normalize("NFKD", filename)
222 filename = filename.encode("ascii", "ignore").decode("ascii")
224 for sep in os.path.sep, os.path.altsep:
225 if sep:
226 filename = filename.replace(sep, " ")
227 filename = str(_filename_ascii_strip_re.sub("", "_".join(filename.split()))).strip(
228 "._"
229 )
231 # on nt a couple of special files are present in each folder. We
232 # have to ensure that the target file is not such a filename. In
233 # this case we prepend an underline
234 if (
235 os.name == "nt"
236 and filename
237 and filename.split(".")[0].upper() in _windows_device_files
238 ):
239 filename = f"_{filename}"
241 return filename
244def redirect(
245 location: str, code: int = 302, Response: t.Optional[t.Type["Response"]] = None
246) -> "Response":
247 """Returns a response object (a WSGI application) that, if called,
248 redirects the client to the target location. Supported codes are
249 301, 302, 303, 305, 307, and 308. 300 is not supported because
250 it's not a real redirect and 304 because it's the answer for a
251 request with a request with defined If-Modified-Since headers.
253 .. versionadded:: 0.6
254 The location can now be a unicode string that is encoded using
255 the :func:`iri_to_uri` function.
257 .. versionadded:: 0.10
258 The class used for the Response object can now be passed in.
260 :param location: the location the response should redirect to.
261 :param code: the redirect status code. defaults to 302.
262 :param class Response: a Response class to use when instantiating a
263 response. The default is :class:`werkzeug.wrappers.Response` if
264 unspecified.
265 """
266 if Response is None:
267 from .wrappers import Response # type: ignore
269 display_location = escape(location)
270 if isinstance(location, str):
271 # Safe conversion is necessary here as we might redirect
272 # to a broken URI scheme (for instance itms-services).
273 from .urls import iri_to_uri
275 location = iri_to_uri(location, safe_conversion=True)
277 response = Response( # type: ignore
278 "<!doctype html>\n"
279 "<html lang=en>\n"
280 "<title>Redirecting...</title>\n"
281 "<h1>Redirecting...</h1>\n"
282 "<p>You should be redirected automatically to the target URL: "
283 f'<a href="{escape(location)}">{display_location}</a>. If'
284 " not, click the link.\n",
285 code,
286 mimetype="text/html",
287 )
288 response.headers["Location"] = location
289 return response
292def append_slash_redirect(environ: "WSGIEnvironment", code: int = 308) -> "Response":
293 """Redirect to the current URL with a slash appended.
295 If the current URL is ``/user/42``, the redirect URL will be
296 ``42/``. When joined to the current URL during response
297 processing or by the browser, this will produce ``/user/42/``.
299 The behavior is undefined if the path ends with a slash already. If
300 called unconditionally on a URL, it may produce a redirect loop.
302 :param environ: Use the path and query from this WSGI environment
303 to produce the redirect URL.
304 :param code: the status code for the redirect.
306 .. versionchanged:: 2.1
307 Produce a relative URL that only modifies the last segment.
308 Relevant when the current path has multiple segments.
310 .. versionchanged:: 2.1
311 The default status code is 308 instead of 301. This preserves
312 the request method and body.
313 """
314 tail = environ["PATH_INFO"].rpartition("/")[2]
316 if not tail:
317 new_path = "./"
318 else:
319 new_path = f"{tail}/"
321 query_string = environ.get("QUERY_STRING")
323 if query_string:
324 new_path = f"{new_path}?{query_string}"
326 return redirect(new_path, code)
329def send_file(
330 path_or_file: t.Union[os.PathLike, str, t.IO[bytes]],
331 environ: "WSGIEnvironment",
332 mimetype: t.Optional[str] = None,
333 as_attachment: bool = False,
334 download_name: t.Optional[str] = None,
335 conditional: bool = True,
336 etag: t.Union[bool, str] = True,
337 last_modified: t.Optional[t.Union[datetime, int, float]] = None,
338 max_age: t.Optional[
339 t.Union[int, t.Callable[[t.Optional[str]], t.Optional[int]]]
340 ] = None,
341 use_x_sendfile: bool = False,
342 response_class: t.Optional[t.Type["Response"]] = None,
343 _root_path: t.Optional[t.Union[os.PathLike, str]] = None,
344) -> "Response":
345 """Send the contents of a file to the client.
347 The first argument can be a file path or a file-like object. Paths
348 are preferred in most cases because Werkzeug can manage the file and
349 get extra information from the path. Passing a file-like object
350 requires that the file is opened in binary mode, and is mostly
351 useful when building a file in memory with :class:`io.BytesIO`.
353 Never pass file paths provided by a user. The path is assumed to be
354 trusted, so a user could craft a path to access a file you didn't
355 intend.
357 If the WSGI server sets a ``file_wrapper`` in ``environ``, it is
358 used, otherwise Werkzeug's built-in wrapper is used. Alternatively,
359 if the HTTP server supports ``X-Sendfile``, ``use_x_sendfile=True``
360 will tell the server to send the given path, which is much more
361 efficient than reading it in Python.
363 :param path_or_file: The path to the file to send, relative to the
364 current working directory if a relative path is given.
365 Alternatively, a file-like object opened in binary mode. Make
366 sure the file pointer is seeked to the start of the data.
367 :param environ: The WSGI environ for the current request.
368 :param mimetype: The MIME type to send for the file. If not
369 provided, it will try to detect it from the file name.
370 :param as_attachment: Indicate to a browser that it should offer to
371 save the file instead of displaying it.
372 :param download_name: The default name browsers will use when saving
373 the file. Defaults to the passed file name.
374 :param conditional: Enable conditional and range responses based on
375 request headers. Requires passing a file path and ``environ``.
376 :param etag: Calculate an ETag for the file, which requires passing
377 a file path. Can also be a string to use instead.
378 :param last_modified: The last modified time to send for the file,
379 in seconds. If not provided, it will try to detect it from the
380 file path.
381 :param max_age: How long the client should cache the file, in
382 seconds. If set, ``Cache-Control`` will be ``public``, otherwise
383 it will be ``no-cache`` to prefer conditional caching.
384 :param use_x_sendfile: Set the ``X-Sendfile`` header to let the
385 server to efficiently send the file. Requires support from the
386 HTTP server. Requires passing a file path.
387 :param response_class: Build the response using this class. Defaults
388 to :class:`~werkzeug.wrappers.Response`.
389 :param _root_path: Do not use. For internal use only. Use
390 :func:`send_from_directory` to safely send files under a path.
392 .. versionchanged:: 2.0.2
393 ``send_file`` only sets a detected ``Content-Encoding`` if
394 ``as_attachment`` is disabled.
396 .. versionadded:: 2.0
397 Adapted from Flask's implementation.
399 .. versionchanged:: 2.0
400 ``download_name`` replaces Flask's ``attachment_filename``
401 parameter. If ``as_attachment=False``, it is passed with
402 ``Content-Disposition: inline`` instead.
404 .. versionchanged:: 2.0
405 ``max_age`` replaces Flask's ``cache_timeout`` parameter.
406 ``conditional`` is enabled and ``max_age`` is not set by
407 default.
409 .. versionchanged:: 2.0
410 ``etag`` replaces Flask's ``add_etags`` parameter. It can be a
411 string to use instead of generating one.
413 .. versionchanged:: 2.0
414 If an encoding is returned when guessing ``mimetype`` from
415 ``download_name``, set the ``Content-Encoding`` header.
416 """
417 if response_class is None:
418 from .wrappers import Response
420 response_class = Response
422 path: t.Optional[str] = None
423 file: t.Optional[t.IO[bytes]] = None
424 size: t.Optional[int] = None
425 mtime: t.Optional[float] = None
426 headers = Headers()
428 if isinstance(path_or_file, (os.PathLike, str)) or hasattr(
429 path_or_file, "__fspath__"
430 ):
431 path_or_file = t.cast(t.Union[os.PathLike, str], path_or_file)
433 # Flask will pass app.root_path, allowing its send_file wrapper
434 # to not have to deal with paths.
435 if _root_path is not None:
436 path = os.path.join(_root_path, path_or_file)
437 else:
438 path = os.path.abspath(path_or_file)
440 stat = os.stat(path)
441 size = stat.st_size
442 mtime = stat.st_mtime
443 else:
444 file = path_or_file
446 if download_name is None and path is not None:
447 download_name = os.path.basename(path)
449 if mimetype is None:
450 if download_name is None:
451 raise TypeError(
452 "Unable to detect the MIME type because a file name is"
453 " not available. Either set 'download_name', pass a"
454 " path instead of a file, or set 'mimetype'."
455 )
457 mimetype, encoding = mimetypes.guess_type(download_name)
459 if mimetype is None:
460 mimetype = "application/octet-stream"
462 # Don't send encoding for attachments, it causes browsers to
463 # save decompress tar.gz files.
464 if encoding is not None and not as_attachment:
465 headers.set("Content-Encoding", encoding)
467 if download_name is not None:
468 try:
469 download_name.encode("ascii")
470 except UnicodeEncodeError:
471 simple = unicodedata.normalize("NFKD", download_name)
472 simple = simple.encode("ascii", "ignore").decode("ascii")
473 quoted = url_quote(download_name, safe="")
474 names = {"filename": simple, "filename*": f"UTF-8''{quoted}"}
475 else:
476 names = {"filename": download_name}
478 value = "attachment" if as_attachment else "inline"
479 headers.set("Content-Disposition", value, **names)
480 elif as_attachment:
481 raise TypeError(
482 "No name provided for attachment. Either set"
483 " 'download_name' or pass a path instead of a file."
484 )
486 if use_x_sendfile and path is not None:
487 headers["X-Sendfile"] = path
488 data = None
489 else:
490 if file is None:
491 file = open(path, "rb") # type: ignore
492 elif isinstance(file, io.BytesIO):
493 size = file.getbuffer().nbytes
494 elif isinstance(file, io.TextIOBase):
495 raise ValueError("Files must be opened in binary mode or use BytesIO.")
497 data = wrap_file(environ, file)
499 rv = response_class(
500 data, mimetype=mimetype, headers=headers, direct_passthrough=True
501 )
503 if size is not None:
504 rv.content_length = size
506 if last_modified is not None:
507 rv.last_modified = last_modified # type: ignore
508 elif mtime is not None:
509 rv.last_modified = mtime # type: ignore
511 rv.cache_control.no_cache = True
513 # Flask will pass app.get_send_file_max_age, allowing its send_file
514 # wrapper to not have to deal with paths.
515 if callable(max_age):
516 max_age = max_age(path)
518 if max_age is not None:
519 if max_age > 0:
520 rv.cache_control.no_cache = None
521 rv.cache_control.public = True
523 rv.cache_control.max_age = max_age
524 rv.expires = int(time() + max_age) # type: ignore
526 if isinstance(etag, str):
527 rv.set_etag(etag)
528 elif etag and path is not None:
529 check = adler32(path.encode("utf-8")) & 0xFFFFFFFF
530 rv.set_etag(f"{mtime}-{size}-{check}")
532 if conditional:
533 try:
534 rv = rv.make_conditional(environ, accept_ranges=True, complete_length=size)
535 except RequestedRangeNotSatisfiable:
536 if file is not None:
537 file.close()
539 raise
541 # Some x-sendfile implementations incorrectly ignore the 304
542 # status code and send the file anyway.
543 if rv.status_code == 304:
544 rv.headers.pop("x-sendfile", None)
546 return rv
549def send_from_directory(
550 directory: t.Union[os.PathLike, str],
551 path: t.Union[os.PathLike, str],
552 environ: "WSGIEnvironment",
553 **kwargs: t.Any,
554) -> "Response":
555 """Send a file from within a directory using :func:`send_file`.
557 This is a secure way to serve files from a folder, such as static
558 files or uploads. Uses :func:`~werkzeug.security.safe_join` to
559 ensure the path coming from the client is not maliciously crafted to
560 point outside the specified directory.
562 If the final path does not point to an existing regular file,
563 returns a 404 :exc:`~werkzeug.exceptions.NotFound` error.
565 :param directory: The directory that ``path`` must be located under.
566 :param path: The path to the file to send, relative to
567 ``directory``.
568 :param environ: The WSGI environ for the current request.
569 :param kwargs: Arguments to pass to :func:`send_file`.
571 .. versionadded:: 2.0
572 Adapted from Flask's implementation.
573 """
574 path = safe_join(os.fspath(directory), os.fspath(path))
576 if path is None:
577 raise NotFound()
579 # Flask will pass app.root_path, allowing its send_from_directory
580 # wrapper to not have to deal with paths.
581 if "_root_path" in kwargs:
582 path = os.path.join(kwargs["_root_path"], path)
584 try:
585 if not os.path.isfile(path):
586 raise NotFound()
587 except ValueError:
588 # path contains null byte on Python < 3.8
589 raise NotFound() from None
591 return send_file(path, environ, **kwargs)
594def import_string(import_name: str, silent: bool = False) -> t.Any:
595 """Imports an object based on a string. This is useful if you want to
596 use import paths as endpoints or something similar. An import path can
597 be specified either in dotted notation (``xml.sax.saxutils.escape``)
598 or with a colon as object delimiter (``xml.sax.saxutils:escape``).
600 If `silent` is True the return value will be `None` if the import fails.
602 :param import_name: the dotted name for the object to import.
603 :param silent: if set to `True` import errors are ignored and
604 `None` is returned instead.
605 :return: imported object
606 """
607 import_name = import_name.replace(":", ".")
608 try:
609 try:
610 __import__(import_name)
611 except ImportError:
612 if "." not in import_name:
613 raise
614 else:
615 return sys.modules[import_name]
617 module_name, obj_name = import_name.rsplit(".", 1)
618 module = __import__(module_name, globals(), locals(), [obj_name])
619 try:
620 return getattr(module, obj_name)
621 except AttributeError as e:
622 raise ImportError(e) from None
624 except ImportError as e:
625 if not silent:
626 raise ImportStringError(import_name, e).with_traceback(
627 sys.exc_info()[2]
628 ) from None
630 return None
633def find_modules(
634 import_path: str, include_packages: bool = False, recursive: bool = False
635) -> t.Iterator[str]:
636 """Finds all the modules below a package. This can be useful to
637 automatically import all views / controllers so that their metaclasses /
638 function decorators have a chance to register themselves on the
639 application.
641 Packages are not returned unless `include_packages` is `True`. This can
642 also recursively list modules but in that case it will import all the
643 packages to get the correct load path of that module.
645 :param import_path: the dotted name for the package to find child modules.
646 :param include_packages: set to `True` if packages should be returned, too.
647 :param recursive: set to `True` if recursion should happen.
648 :return: generator
649 """
650 module = import_string(import_path)
651 path = getattr(module, "__path__", None)
652 if path is None:
653 raise ValueError(f"{import_path!r} is not a package")
654 basename = f"{module.__name__}."
655 for _importer, modname, ispkg in pkgutil.iter_modules(path):
656 modname = basename + modname
657 if ispkg:
658 if include_packages:
659 yield modname
660 if recursive:
661 yield from find_modules(modname, include_packages, True)
662 else:
663 yield modname
666class ImportStringError(ImportError):
667 """Provides information about a failed :func:`import_string` attempt."""
669 #: String in dotted notation that failed to be imported.
670 import_name: str
671 #: Wrapped exception.
672 exception: BaseException
674 def __init__(self, import_name: str, exception: BaseException) -> None:
675 self.import_name = import_name
676 self.exception = exception
677 msg = import_name
678 name = ""
679 tracked = []
680 for part in import_name.replace(":", ".").split("."):
681 name = f"{name}.{part}" if name else part
682 imported = import_string(name, silent=True)
683 if imported:
684 tracked.append((name, getattr(imported, "__file__", None)))
685 else:
686 track = [f"- {n!r} found in {i!r}." for n, i in tracked]
687 track.append(f"- {name!r} not found.")
688 track_str = "\n".join(track)
689 msg = (
690 f"import_string() failed for {import_name!r}. Possible reasons"
691 f" are:\n\n"
692 "- missing __init__.py in a package;\n"
693 "- package or module path not included in sys.path;\n"
694 "- duplicated package or module name taking precedence in"
695 " sys.path;\n"
696 "- missing module, class, function or variable;\n\n"
697 f"Debugged import:\n\n{track_str}\n\n"
698 f"Original exception:\n\n{type(exception).__name__}: {exception}"
699 )
700 break
702 super().__init__(msg)
704 def __repr__(self) -> str:
705 return f"<{type(self).__name__}({self.import_name!r}, {self.exception!r})>"