Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/utils.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import io
4import mimetypes
5import os
6import pkgutil
7import re
8import sys
9import typing as t
10import unicodedata
11from datetime import datetime
12from time import time
13from urllib.parse import quote
14from zlib import adler32
16from markupsafe import escape
18from ._internal import _DictAccessorProperty
19from ._internal import _missing
20from ._internal import _TAccessorValue
21from .datastructures import Headers
22from .exceptions import NotFound
23from .exceptions import RequestedRangeNotSatisfiable
24from .security import safe_join
25from .wsgi import wrap_file
27if t.TYPE_CHECKING:
28 from _typeshed.wsgi import WSGIEnvironment
30 from .wrappers.request import Request
31 from .wrappers.response import Response
33_T = t.TypeVar("_T")
35_entity_re = re.compile(r"&([^;]+);")
36_filename_ascii_strip_re = re.compile(r"[^A-Za-z0-9_.-]")
37_windows_device_files = {
38 "CON",
39 "PRN",
40 "AUX",
41 "NUL",
42 *(f"COM{i}" for i in range(10)),
43 *(f"LPT{i}" for i in range(10)),
44}
47class cached_property(property, t.Generic[_T]):
48 """A :func:`property` that is only evaluated once. Subsequent access
49 returns the cached value. Setting the property sets the cached
50 value. Deleting the property clears the cached value, accessing it
51 again will evaluate it again.
53 .. code-block:: python
55 class Example:
56 @cached_property
57 def value(self):
58 # calculate something important here
59 return 42
61 e = Example()
62 e.value # evaluates
63 e.value # uses cache
64 e.value = 16 # sets cache
65 del e.value # clears cache
67 If the class defines ``__slots__``, it must add ``_cache_{name}`` as
68 a slot. Alternatively, it can add ``__dict__``, but that's usually
69 not desirable.
71 .. versionchanged:: 2.1
72 Works with ``__slots__``.
74 .. versionchanged:: 2.0
75 ``del obj.name`` clears the cached value.
76 """
78 def __init__(
79 self,
80 fget: t.Callable[[t.Any], _T],
81 name: str | None = None,
82 doc: str | None = None,
83 ) -> None:
84 super().__init__(fget, doc=doc)
85 self.__name__ = name or fget.__name__
86 self.slot_name = f"_cache_{self.__name__}"
87 self.__module__ = fget.__module__
89 def __set__(self, obj: object, value: _T) -> None:
90 if hasattr(obj, "__dict__"):
91 obj.__dict__[self.__name__] = value
92 else:
93 setattr(obj, self.slot_name, value)
95 def __get__(self, obj: object, type: type = None) -> _T: # type: ignore
96 if obj is None:
97 return self # type: ignore
99 obj_dict = getattr(obj, "__dict__", None)
101 if obj_dict is not None:
102 value: _T = obj_dict.get(self.__name__, _missing)
103 else:
104 value = getattr(obj, self.slot_name, _missing) # type: ignore[arg-type]
106 if value is _missing:
107 value = self.fget(obj) # type: ignore
109 if obj_dict is not None:
110 obj.__dict__[self.__name__] = value
111 else:
112 setattr(obj, self.slot_name, value)
114 return value
116 def __delete__(self, obj: object) -> None:
117 if hasattr(obj, "__dict__"):
118 del obj.__dict__[self.__name__]
119 else:
120 setattr(obj, self.slot_name, _missing)
123class environ_property(_DictAccessorProperty[_TAccessorValue]):
124 """Maps request attributes to environment variables. This works not only
125 for the Werkzeug request object, but also any other class with an
126 environ attribute:
128 >>> class Test(object):
129 ... environ = {'key': 'value'}
130 ... test = environ_property('key')
131 >>> var = Test()
132 >>> var.test
133 'value'
135 If you pass it a second value it's used as default if the key does not
136 exist, the third one can be a converter that takes a value and converts
137 it. If it raises :exc:`ValueError` or :exc:`TypeError` the default value
138 is used. If no default value is provided `None` is used.
140 Per default the property is read only. You have to explicitly enable it
141 by passing ``read_only=False`` to the constructor.
142 """
144 read_only = True
146 def lookup(self, obj: Request) -> WSGIEnvironment:
147 return obj.environ
150class header_property(_DictAccessorProperty[_TAccessorValue]):
151 """Like `environ_property` but for headers."""
153 def lookup(self, obj: Request | Response) -> Headers:
154 return obj.headers
157# https://cgit.freedesktop.org/xdg/shared-mime-info/tree/freedesktop.org.xml.in
158# https://www.iana.org/assignments/media-types/media-types.xhtml
159# Types listed in the XDG mime info that have a charset in the IANA registration.
160_charset_mimetypes = {
161 "application/ecmascript",
162 "application/javascript",
163 "application/sql",
164 "application/xml",
165 "application/xml-dtd",
166 "application/xml-external-parsed-entity",
167}
170def get_content_type(mimetype: str, charset: str) -> str:
171 """Returns the full content type string with charset for a mimetype.
173 If the mimetype represents text, the charset parameter will be
174 appended, otherwise the mimetype is returned unchanged.
176 :param mimetype: The mimetype to be used as content type.
177 :param charset: The charset to be appended for text mimetypes.
178 :return: The content type.
180 .. versionchanged:: 0.15
181 Any type that ends with ``+xml`` gets a charset, not just those
182 that start with ``application/``. Known text types such as
183 ``application/javascript`` are also given charsets.
184 """
185 if (
186 mimetype.startswith("text/")
187 or mimetype in _charset_mimetypes
188 or mimetype.endswith("+xml")
189 ):
190 mimetype += f"; charset={charset}"
192 return mimetype
195def secure_filename(filename: str) -> str:
196 r"""Pass it a filename and it will return a secure version of it. This
197 filename can then safely be stored on a regular file system and passed
198 to :func:`os.path.join`. The filename returned is an ASCII only string
199 for maximum portability.
201 On windows systems the function also makes sure that the file is not
202 named after one of the special device files.
204 >>> secure_filename("My cool movie.mov")
205 'My_cool_movie.mov'
206 >>> secure_filename("../../../etc/passwd")
207 'etc_passwd'
208 >>> secure_filename('i contain cool \xfcml\xe4uts.txt')
209 'i_contain_cool_umlauts.txt'
211 The function might return an empty filename. It's your responsibility
212 to ensure that the filename is unique and that you abort or
213 generate a random filename if the function returned an empty one.
215 .. versionadded:: 0.5
217 :param filename: the filename to secure
218 """
219 filename = unicodedata.normalize("NFKD", filename)
220 filename = filename.encode("ascii", "ignore").decode("ascii")
222 for sep in os.sep, os.path.altsep:
223 if sep:
224 filename = filename.replace(sep, " ")
225 filename = str(_filename_ascii_strip_re.sub("", "_".join(filename.split()))).strip(
226 "._"
227 )
229 # on nt a couple of special files are present in each folder. We
230 # have to ensure that the target file is not such a filename. In
231 # this case we prepend an underline
232 if (
233 os.name == "nt"
234 and filename
235 and filename.split(".")[0].upper() in _windows_device_files
236 ):
237 filename = f"_{filename}"
239 return filename
242def redirect(
243 location: str, code: int = 302, Response: type[Response] | None = None
244) -> Response:
245 """Returns a response object (a WSGI application) that, if called,
246 redirects the client to the target location. Supported codes are
247 301, 302, 303, 305, 307, and 308. 300 is not supported because
248 it's not a real redirect and 304 because it's the answer for a
249 request with a request with defined If-Modified-Since headers.
251 .. versionadded:: 0.6
252 The location can now be a unicode string that is encoded using
253 the :func:`iri_to_uri` function.
255 .. versionadded:: 0.10
256 The class used for the Response object can now be passed in.
258 :param location: the location the response should redirect to.
259 :param code: the redirect status code. defaults to 302.
260 :param class Response: a Response class to use when instantiating a
261 response. The default is :class:`werkzeug.wrappers.Response` if
262 unspecified.
263 """
264 if Response is None:
265 from .wrappers import Response
267 html_location = escape(location)
268 response = Response( # type: ignore[misc]
269 "<!doctype html>\n"
270 "<html lang=en>\n"
271 "<title>Redirecting...</title>\n"
272 "<h1>Redirecting...</h1>\n"
273 "<p>You should be redirected automatically to the target URL: "
274 f'<a href="{html_location}">{html_location}</a>. If not, click the link.\n',
275 code,
276 mimetype="text/html",
277 )
278 response.headers["Location"] = location
279 return response
282def append_slash_redirect(environ: WSGIEnvironment, code: int = 308) -> Response:
283 """Redirect to the current URL with a slash appended.
285 If the current URL is ``/user/42``, the redirect URL will be
286 ``42/``. When joined to the current URL during response
287 processing or by the browser, this will produce ``/user/42/``.
289 The behavior is undefined if the path ends with a slash already. If
290 called unconditionally on a URL, it may produce a redirect loop.
292 :param environ: Use the path and query from this WSGI environment
293 to produce the redirect URL.
294 :param code: the status code for the redirect.
296 .. versionchanged:: 2.1
297 Produce a relative URL that only modifies the last segment.
298 Relevant when the current path has multiple segments.
300 .. versionchanged:: 2.1
301 The default status code is 308 instead of 301. This preserves
302 the request method and body.
303 """
304 tail = environ["PATH_INFO"].rpartition("/")[2]
306 if not tail:
307 new_path = "./"
308 else:
309 new_path = f"{tail}/"
311 query_string = environ.get("QUERY_STRING")
313 if query_string:
314 new_path = f"{new_path}?{query_string}"
316 return redirect(new_path, code)
319def send_file(
320 path_or_file: os.PathLike[str] | str | t.IO[bytes],
321 environ: WSGIEnvironment,
322 mimetype: str | None = None,
323 as_attachment: bool = False,
324 download_name: str | None = None,
325 conditional: bool = True,
326 etag: bool | str = True,
327 last_modified: datetime | int | float | None = None,
328 max_age: None | (int | t.Callable[[str | None], int | None]) = None,
329 use_x_sendfile: bool = False,
330 response_class: type[Response] | None = None,
331 _root_path: os.PathLike[str] | str | None = None,
332) -> Response:
333 """Send the contents of a file to the client.
335 The first argument can be a file path or a file-like object. Paths
336 are preferred in most cases because Werkzeug can manage the file and
337 get extra information from the path. Passing a file-like object
338 requires that the file is opened in binary mode, and is mostly
339 useful when building a file in memory with :class:`io.BytesIO`.
341 Never pass file paths provided by a user. The path is assumed to be
342 trusted, so a user could craft a path to access a file you didn't
343 intend. Use :func:`send_from_directory` to safely serve user-provided paths.
345 If the WSGI server sets a ``file_wrapper`` in ``environ``, it is
346 used, otherwise Werkzeug's built-in wrapper is used. Alternatively,
347 if the HTTP server supports ``X-Sendfile``, ``use_x_sendfile=True``
348 will tell the server to send the given path, which is much more
349 efficient than reading it in Python.
351 :param path_or_file: The path to the file to send, relative to the
352 current working directory if a relative path is given.
353 Alternatively, a file-like object opened in binary mode. Make
354 sure the file pointer is seeked to the start of the data.
355 :param environ: The WSGI environ for the current request.
356 :param mimetype: The MIME type to send for the file. If not
357 provided, it will try to detect it from the file name.
358 :param as_attachment: Indicate to a browser that it should offer to
359 save the file instead of displaying it.
360 :param download_name: The default name browsers will use when saving
361 the file. Defaults to the passed file name.
362 :param conditional: Enable conditional and range responses based on
363 request headers. Requires passing a file path and ``environ``.
364 :param etag: Calculate an ETag for the file, which requires passing
365 a file path. Can also be a string to use instead.
366 :param last_modified: The last modified time to send for the file,
367 in seconds. If not provided, it will try to detect it from the
368 file path.
369 :param max_age: How long the client should cache the file, in
370 seconds. If set, ``Cache-Control`` will be ``public``, otherwise
371 it will be ``no-cache`` to prefer conditional caching.
372 :param use_x_sendfile: Set the ``X-Sendfile`` header to let the
373 server to efficiently send the file. Requires support from the
374 HTTP server. Requires passing a file path.
375 :param response_class: Build the response using this class. Defaults
376 to :class:`~werkzeug.wrappers.Response`.
377 :param _root_path: Do not use. For internal use only. Use
378 :func:`send_from_directory` to safely send files under a path.
380 .. versionchanged:: 2.0.2
381 ``send_file`` only sets a detected ``Content-Encoding`` if
382 ``as_attachment`` is disabled.
384 .. versionadded:: 2.0
385 Adapted from Flask's implementation.
387 .. versionchanged:: 2.0
388 ``download_name`` replaces Flask's ``attachment_filename``
389 parameter. If ``as_attachment=False``, it is passed with
390 ``Content-Disposition: inline`` instead.
392 .. versionchanged:: 2.0
393 ``max_age`` replaces Flask's ``cache_timeout`` parameter.
394 ``conditional`` is enabled and ``max_age`` is not set by
395 default.
397 .. versionchanged:: 2.0
398 ``etag`` replaces Flask's ``add_etags`` parameter. It can be a
399 string to use instead of generating one.
401 .. versionchanged:: 2.0
402 If an encoding is returned when guessing ``mimetype`` from
403 ``download_name``, set the ``Content-Encoding`` header.
404 """
405 if response_class is None:
406 from .wrappers import Response
408 response_class = Response
410 path: str | None = None
411 file: t.IO[bytes] | None = None
412 size: int | None = None
413 mtime: float | None = None
414 headers = Headers()
416 if isinstance(path_or_file, (os.PathLike, str)) or hasattr(
417 path_or_file, "__fspath__"
418 ):
419 path_or_file = t.cast("t.Union[os.PathLike[str], str]", path_or_file)
421 # Flask will pass app.root_path, allowing its send_file wrapper
422 # to not have to deal with paths.
423 if _root_path is not None:
424 path = os.path.join(_root_path, path_or_file)
425 else:
426 path = os.path.abspath(path_or_file)
428 stat = os.stat(path)
429 size = stat.st_size
430 mtime = stat.st_mtime
431 else:
432 file = path_or_file
434 if download_name is None and path is not None:
435 download_name = os.path.basename(path)
437 if mimetype is None:
438 if download_name is None:
439 raise TypeError(
440 "Unable to detect the MIME type because a file name is"
441 " not available. Either set 'download_name', pass a"
442 " path instead of a file, or set 'mimetype'."
443 )
445 mimetype, encoding = mimetypes.guess_type(download_name)
447 if mimetype is None:
448 mimetype = "application/octet-stream"
450 # Don't send encoding for attachments, it causes browsers to
451 # save decompress tar.gz files.
452 if encoding is not None and not as_attachment:
453 headers.set("Content-Encoding", encoding)
455 if download_name is not None:
456 try:
457 download_name.encode("ascii")
458 except UnicodeEncodeError:
459 simple = unicodedata.normalize("NFKD", download_name)
460 simple = simple.encode("ascii", "ignore").decode("ascii")
461 # safe = RFC 5987 attr-char
462 quoted = quote(download_name, safe="!#$&+-.^_`|~")
463 names = {"filename": simple, "filename*": f"UTF-8''{quoted}"}
464 else:
465 names = {"filename": download_name}
467 value = "attachment" if as_attachment else "inline"
468 headers.set("Content-Disposition", value, **names)
469 elif as_attachment:
470 raise TypeError(
471 "No name provided for attachment. Either set"
472 " 'download_name' or pass a path instead of a file."
473 )
475 if use_x_sendfile and path is not None:
476 headers["X-Sendfile"] = path
477 data = None
478 else:
479 if file is None:
480 file = open(path, "rb") # type: ignore
481 elif isinstance(file, io.BytesIO):
482 size = file.getbuffer().nbytes
483 elif isinstance(file, io.TextIOBase):
484 raise ValueError("Files must be opened in binary mode or use BytesIO.")
486 data = wrap_file(environ, file)
488 rv = response_class(
489 data, mimetype=mimetype, headers=headers, direct_passthrough=True
490 )
492 if size is not None:
493 rv.content_length = size
495 if last_modified is not None:
496 rv.last_modified = last_modified # type: ignore
497 elif mtime is not None:
498 rv.last_modified = mtime # type: ignore
500 rv.cache_control.no_cache = True
502 # Flask will pass app.get_send_file_max_age, allowing its send_file
503 # wrapper to not have to deal with paths.
504 if callable(max_age):
505 max_age = max_age(path)
507 if max_age is not None:
508 if max_age > 0:
509 rv.cache_control.no_cache = None
510 rv.cache_control.public = True
512 rv.cache_control.max_age = max_age
513 rv.expires = int(time() + max_age) # type: ignore
515 if isinstance(etag, str):
516 rv.set_etag(etag)
517 elif etag and path is not None:
518 check = adler32(path.encode()) & 0xFFFFFFFF
519 rv.set_etag(f"{mtime}-{size}-{check}")
521 if conditional:
522 try:
523 rv = rv.make_conditional(environ, accept_ranges=True, complete_length=size)
524 except RequestedRangeNotSatisfiable:
525 if file is not None:
526 file.close()
528 raise
530 # Some x-sendfile implementations incorrectly ignore the 304
531 # status code and send the file anyway.
532 if rv.status_code == 304:
533 rv.headers.pop("x-sendfile", None)
535 return rv
538def send_from_directory(
539 directory: os.PathLike[str] | str,
540 path: os.PathLike[str] | str,
541 environ: WSGIEnvironment,
542 **kwargs: t.Any,
543) -> Response:
544 """Send a file from within a directory using :func:`send_file`.
546 This is a secure way to serve files from a folder, such as static
547 files or uploads. Uses :func:`~werkzeug.security.safe_join` to
548 ensure the path coming from the client is not maliciously crafted to
549 point outside the specified directory.
551 If the final path does not point to an existing regular file,
552 returns a 404 :exc:`~werkzeug.exceptions.NotFound` error.
554 :param directory: The directory that ``path`` must be located under. This *must not*
555 be a value provided by the client, otherwise it becomes insecure.
556 :param path: The path to the file to send, relative to ``directory``. This is the
557 part of the path provided by the client, which is checked for security.
558 :param environ: The WSGI environ for the current request.
559 :param kwargs: Arguments to pass to :func:`send_file`.
561 .. versionadded:: 2.0
562 Adapted from Flask's implementation.
563 """
564 path_str = safe_join(os.fspath(directory), os.fspath(path))
566 if path_str is None:
567 raise NotFound()
569 # Flask will pass app.root_path, allowing its send_from_directory
570 # wrapper to not have to deal with paths.
571 if "_root_path" in kwargs:
572 path_str = os.path.join(kwargs["_root_path"], path_str)
574 if not os.path.isfile(path_str):
575 raise NotFound()
577 return send_file(path_str, environ, **kwargs)
580def import_string(import_name: str, silent: bool = False) -> t.Any:
581 """Imports an object based on a string. This is useful if you want to
582 use import paths as endpoints or something similar. An import path can
583 be specified either in dotted notation (``xml.sax.saxutils.escape``)
584 or with a colon as object delimiter (``xml.sax.saxutils:escape``).
586 If `silent` is True the return value will be `None` if the import fails.
588 :param import_name: the dotted name for the object to import.
589 :param silent: if set to `True` import errors are ignored and
590 `None` is returned instead.
591 :return: imported object
592 """
593 import_name = import_name.replace(":", ".")
594 try:
595 try:
596 __import__(import_name)
597 except ImportError:
598 if "." not in import_name:
599 raise
600 else:
601 return sys.modules[import_name]
603 module_name, obj_name = import_name.rsplit(".", 1)
604 module = __import__(module_name, globals(), locals(), [obj_name])
605 try:
606 return getattr(module, obj_name)
607 except AttributeError as e:
608 raise ImportError(e) from None
610 except ImportError as e:
611 if not silent:
612 raise ImportStringError(import_name, e).with_traceback(
613 sys.exc_info()[2]
614 ) from None
616 return None
619def find_modules(
620 import_path: str, include_packages: bool = False, recursive: bool = False
621) -> t.Iterator[str]:
622 """Finds all the modules below a package. This can be useful to
623 automatically import all views / controllers so that their metaclasses /
624 function decorators have a chance to register themselves on the
625 application.
627 Packages are not returned unless `include_packages` is `True`. This can
628 also recursively list modules but in that case it will import all the
629 packages to get the correct load path of that module.
631 :param import_path: the dotted name for the package to find child modules.
632 :param include_packages: set to `True` if packages should be returned, too.
633 :param recursive: set to `True` if recursion should happen.
634 :return: generator
635 """
636 module = import_string(import_path)
637 path = getattr(module, "__path__", None)
638 if path is None:
639 raise ValueError(f"{import_path!r} is not a package")
640 basename = f"{module.__name__}."
641 for _importer, modname, ispkg in pkgutil.iter_modules(path):
642 modname = basename + modname
643 if ispkg:
644 if include_packages:
645 yield modname
646 if recursive:
647 yield from find_modules(modname, include_packages, True)
648 else:
649 yield modname
652class ImportStringError(ImportError):
653 """Provides information about a failed :func:`import_string` attempt."""
655 #: String in dotted notation that failed to be imported.
656 import_name: str
657 #: Wrapped exception.
658 exception: BaseException
660 def __init__(self, import_name: str, exception: BaseException) -> None:
661 self.import_name = import_name
662 self.exception = exception
663 msg = import_name
664 name = ""
665 tracked = []
666 for part in import_name.replace(":", ".").split("."):
667 name = f"{name}.{part}" if name else part
668 imported = import_string(name, silent=True)
669 if imported:
670 tracked.append((name, getattr(imported, "__file__", None)))
671 else:
672 track = [f"- {n!r} found in {i!r}." for n, i in tracked]
673 track.append(f"- {name!r} not found.")
674 track_str = "\n".join(track)
675 msg = (
676 f"import_string() failed for {import_name!r}. Possible reasons"
677 f" are:\n\n"
678 "- missing __init__.py in a package;\n"
679 "- package or module path not included in sys.path;\n"
680 "- duplicated package or module name taking precedence in"
681 " sys.path;\n"
682 "- missing module, class, function or variable;\n\n"
683 f"Debugged import:\n\n{track_str}\n\n"
684 f"Original exception:\n\n{type(exception).__name__}: {exception}"
685 )
686 break
688 super().__init__(msg)
690 def __repr__(self) -> str:
691 return f"<{type(self).__name__}({self.import_name!r}, {self.exception!r})>"