Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/utils.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import io
4import mimetypes
5import os
6import pkgutil
7import re
8import sys
9import typing as t
10import unicodedata
11from datetime import datetime
12from time import time
13from urllib.parse import quote
14from zlib import adler32
16from markupsafe import escape
18from ._internal import _DictAccessorProperty
19from ._internal import _missing
20from ._internal import _TAccessorValue
21from .datastructures import Headers
22from .exceptions import NotFound
23from .exceptions import RequestedRangeNotSatisfiable
24from .security import _windows_device_files
25from .security import safe_join
26from .wsgi import wrap_file
28if t.TYPE_CHECKING:
29 from _typeshed.wsgi import WSGIEnvironment
31 from .wrappers.request import Request
32 from .wrappers.response import Response
34_T = t.TypeVar("_T")
36_entity_re = re.compile(r"&([^;]+);")
37_filename_ascii_strip_re = re.compile(r"[^A-Za-z0-9_.-]")
40class cached_property(property, t.Generic[_T]):
41 """A :func:`property` that is only evaluated once. Subsequent access
42 returns the cached value. Setting the property sets the cached
43 value. Deleting the property clears the cached value, accessing it
44 again will evaluate it again.
46 .. code-block:: python
48 class Example:
49 @cached_property
50 def value(self):
51 # calculate something important here
52 return 42
54 e = Example()
55 e.value # evaluates
56 e.value # uses cache
57 e.value = 16 # sets cache
58 del e.value # clears cache
60 If the class defines ``__slots__``, it must add ``_cache_{name}`` as
61 a slot. Alternatively, it can add ``__dict__``, but that's usually
62 not desirable.
64 .. versionchanged:: 2.1
65 Works with ``__slots__``.
67 .. versionchanged:: 2.0
68 ``del obj.name`` clears the cached value.
69 """
71 def __init__(
72 self,
73 fget: t.Callable[[t.Any], _T],
74 name: str | None = None,
75 doc: str | None = None,
76 ) -> None:
77 super().__init__(fget, doc=doc)
78 self.__name__ = name or fget.__name__
79 self.slot_name = f"_cache_{self.__name__}"
80 self.__module__ = fget.__module__
82 def __set__(self, obj: object, value: _T) -> None:
83 if hasattr(obj, "__dict__"):
84 obj.__dict__[self.__name__] = value
85 else:
86 setattr(obj, self.slot_name, value)
88 def __get__(self, obj: object, type: type = None) -> _T: # type: ignore
89 if obj is None:
90 return self # type: ignore
92 obj_dict = getattr(obj, "__dict__", None)
94 if obj_dict is not None:
95 value: _T = obj_dict.get(self.__name__, _missing)
96 else:
97 value = getattr(obj, self.slot_name, _missing) # type: ignore[arg-type]
99 if value is _missing:
100 value = self.fget(obj) # type: ignore
102 if obj_dict is not None:
103 obj.__dict__[self.__name__] = value
104 else:
105 setattr(obj, self.slot_name, value)
107 return value
109 def __delete__(self, obj: object) -> None:
110 if hasattr(obj, "__dict__"):
111 del obj.__dict__[self.__name__]
112 else:
113 setattr(obj, self.slot_name, _missing)
116class environ_property(_DictAccessorProperty[_TAccessorValue]):
117 """Maps request attributes to environment variables. This works not only
118 for the Werkzeug request object, but also any other class with an
119 environ attribute:
121 >>> class Test(object):
122 ... environ = {'key': 'value'}
123 ... test = environ_property('key')
124 >>> var = Test()
125 >>> var.test
126 'value'
128 If you pass it a second value it's used as default if the key does not
129 exist, the third one can be a converter that takes a value and converts
130 it. If it raises :exc:`ValueError` or :exc:`TypeError` the default value
131 is used. If no default value is provided `None` is used.
133 Per default the property is read only. You have to explicitly enable it
134 by passing ``read_only=False`` to the constructor.
135 """
137 read_only = True
139 def lookup(self, obj: Request) -> WSGIEnvironment:
140 return obj.environ
143class header_property(_DictAccessorProperty[_TAccessorValue]):
144 """Like `environ_property` but for headers."""
146 def lookup(self, obj: Request | Response) -> Headers: # type: ignore[override]
147 return obj.headers
150# https://cgit.freedesktop.org/xdg/shared-mime-info/tree/freedesktop.org.xml.in
151# https://www.iana.org/assignments/media-types/media-types.xhtml
152# Types listed in the XDG mime info that have a charset in the IANA registration.
153_charset_mimetypes = {
154 "application/ecmascript",
155 "application/javascript",
156 "application/sql",
157 "application/xml",
158 "application/xml-dtd",
159 "application/xml-external-parsed-entity",
160}
163def get_content_type(mimetype: str, charset: str) -> str:
164 """Returns the full content type string with charset for a mimetype.
166 If the mimetype represents text, the charset parameter will be
167 appended, otherwise the mimetype is returned unchanged.
169 :param mimetype: The mimetype to be used as content type.
170 :param charset: The charset to be appended for text mimetypes.
171 :return: The content type.
173 .. versionchanged:: 0.15
174 Any type that ends with ``+xml`` gets a charset, not just those
175 that start with ``application/``. Known text types such as
176 ``application/javascript`` are also given charsets.
177 """
178 if (
179 mimetype.startswith("text/")
180 or mimetype in _charset_mimetypes
181 or mimetype.endswith("+xml")
182 ):
183 mimetype += f"; charset={charset}"
185 return mimetype
188def secure_filename(filename: str) -> str:
189 r"""Pass it a filename and it will return a secure version of it. This
190 filename can then safely be stored on a regular file system and passed
191 to :func:`os.path.join`. The filename returned is an ASCII only string
192 for maximum portability.
194 On windows systems the function also makes sure that the file is not
195 named after one of the special device files.
197 >>> secure_filename("My cool movie.mov")
198 'My_cool_movie.mov'
199 >>> secure_filename("../../../etc/passwd")
200 'etc_passwd'
201 >>> secure_filename('i contain cool \xfcml\xe4uts.txt')
202 'i_contain_cool_umlauts.txt'
204 The function might return an empty filename. It's your responsibility
205 to ensure that the filename is unique and that you abort or
206 generate a random filename if the function returned an empty one.
208 .. versionadded:: 0.5
210 :param filename: the filename to secure
211 """
212 filename = unicodedata.normalize("NFKD", filename)
213 filename = filename.encode("ascii", "ignore").decode("ascii")
215 for sep in os.sep, os.path.altsep:
216 if sep:
217 filename = filename.replace(sep, " ")
218 filename = str(_filename_ascii_strip_re.sub("", "_".join(filename.split()))).strip(
219 "._"
220 )
222 # on nt a couple of special files are present in each folder. We
223 # have to ensure that the target file is not such a filename. In
224 # this case we prepend an underline
225 if (
226 os.name == "nt"
227 and filename
228 and filename.split(".")[0].upper() in _windows_device_files
229 ):
230 filename = f"_{filename}"
232 return filename
235def redirect(
236 location: str, code: int = 302, Response: type[Response] | None = None
237) -> Response:
238 """Returns a response object (a WSGI application) that, if called,
239 redirects the client to the target location. Supported codes are
240 301, 302, 303, 305, 307, and 308. 300 is not supported because
241 it's not a real redirect and 304 because it's the answer for a
242 request with a request with defined If-Modified-Since headers.
244 .. versionadded:: 0.6
245 The location can now be a unicode string that is encoded using
246 the :func:`iri_to_uri` function.
248 .. versionadded:: 0.10
249 The class used for the Response object can now be passed in.
251 :param location: the location the response should redirect to.
252 :param code: the redirect status code. defaults to 302.
253 :param class Response: a Response class to use when instantiating a
254 response. The default is :class:`werkzeug.wrappers.Response` if
255 unspecified.
256 """
257 if Response is None:
258 from .wrappers import Response
260 html_location = escape(location)
261 response = Response( # type: ignore[misc]
262 "<!doctype html>\n"
263 "<html lang=en>\n"
264 "<title>Redirecting...</title>\n"
265 "<h1>Redirecting...</h1>\n"
266 "<p>You should be redirected automatically to the target URL: "
267 f'<a href="{html_location}">{html_location}</a>. If not, click the link.\n',
268 code,
269 mimetype="text/html",
270 )
271 response.headers["Location"] = location
272 return response
275def append_slash_redirect(environ: WSGIEnvironment, code: int = 308) -> Response:
276 """Redirect to the current URL with a slash appended.
278 If the current URL is ``/user/42``, the redirect URL will be
279 ``42/``. When joined to the current URL during response
280 processing or by the browser, this will produce ``/user/42/``.
282 The behavior is undefined if the path ends with a slash already. If
283 called unconditionally on a URL, it may produce a redirect loop.
285 :param environ: Use the path and query from this WSGI environment
286 to produce the redirect URL.
287 :param code: the status code for the redirect.
289 .. versionchanged:: 2.1
290 Produce a relative URL that only modifies the last segment.
291 Relevant when the current path has multiple segments.
293 .. versionchanged:: 2.1
294 The default status code is 308 instead of 301. This preserves
295 the request method and body.
296 """
297 tail = environ["PATH_INFO"].rpartition("/")[2]
299 if not tail:
300 new_path = "./"
301 else:
302 new_path = f"{tail}/"
304 query_string = environ.get("QUERY_STRING")
306 if query_string:
307 new_path = f"{new_path}?{query_string}"
309 return redirect(new_path, code)
312def send_file(
313 path_or_file: os.PathLike[str] | str | t.IO[bytes],
314 environ: WSGIEnvironment,
315 mimetype: str | None = None,
316 as_attachment: bool = False,
317 download_name: str | None = None,
318 conditional: bool = True,
319 etag: bool | str = True,
320 last_modified: datetime | int | float | None = None,
321 max_age: None | (int | t.Callable[[str | None], int | None]) = None,
322 use_x_sendfile: bool = False,
323 response_class: type[Response] | None = None,
324 _root_path: os.PathLike[str] | str | None = None,
325) -> Response:
326 """Send the contents of a file to the client.
328 The first argument can be a file path or a file-like object. Paths
329 are preferred in most cases because Werkzeug can manage the file and
330 get extra information from the path. Passing a file-like object
331 requires that the file is opened in binary mode, and is mostly
332 useful when building a file in memory with :class:`io.BytesIO`.
334 Never pass file paths provided by a user. The path is assumed to be
335 trusted, so a user could craft a path to access a file you didn't
336 intend. Use :func:`send_from_directory` to safely serve user-provided paths.
338 If the WSGI server sets a ``file_wrapper`` in ``environ``, it is
339 used, otherwise Werkzeug's built-in wrapper is used. Alternatively,
340 if the HTTP server supports ``X-Sendfile``, ``use_x_sendfile=True``
341 will tell the server to send the given path, which is much more
342 efficient than reading it in Python.
344 :param path_or_file: The path to the file to send, relative to the
345 current working directory if a relative path is given.
346 Alternatively, a file-like object opened in binary mode. Make
347 sure the file pointer is seeked to the start of the data.
348 :param environ: The WSGI environ for the current request.
349 :param mimetype: The MIME type to send for the file. If not
350 provided, it will try to detect it from the file name.
351 :param as_attachment: Indicate to a browser that it should offer to
352 save the file instead of displaying it.
353 :param download_name: The default name browsers will use when saving
354 the file. Defaults to the passed file name.
355 :param conditional: Enable conditional and range responses based on
356 request headers. Requires passing a file path and ``environ``.
357 :param etag: Calculate an ETag for the file, which requires passing
358 a file path. Can also be a string to use instead.
359 :param last_modified: The last modified time to send for the file,
360 in seconds. If not provided, it will try to detect it from the
361 file path.
362 :param max_age: How long the client should cache the file, in
363 seconds. If set, ``Cache-Control`` will be ``public``, otherwise
364 it will be ``no-cache`` to prefer conditional caching.
365 :param use_x_sendfile: Set the ``X-Sendfile`` header to let the
366 server to efficiently send the file. Requires support from the
367 HTTP server. Requires passing a file path.
368 :param response_class: Build the response using this class. Defaults
369 to :class:`~werkzeug.wrappers.Response`.
370 :param _root_path: Do not use. For internal use only. Use
371 :func:`send_from_directory` to safely send files under a path.
373 .. versionchanged:: 2.0.2
374 ``send_file`` only sets a detected ``Content-Encoding`` if
375 ``as_attachment`` is disabled.
377 .. versionadded:: 2.0
378 Adapted from Flask's implementation.
380 .. versionchanged:: 2.0
381 ``download_name`` replaces Flask's ``attachment_filename``
382 parameter. If ``as_attachment=False``, it is passed with
383 ``Content-Disposition: inline`` instead.
385 .. versionchanged:: 2.0
386 ``max_age`` replaces Flask's ``cache_timeout`` parameter.
387 ``conditional`` is enabled and ``max_age`` is not set by
388 default.
390 .. versionchanged:: 2.0
391 ``etag`` replaces Flask's ``add_etags`` parameter. It can be a
392 string to use instead of generating one.
394 .. versionchanged:: 2.0
395 If an encoding is returned when guessing ``mimetype`` from
396 ``download_name``, set the ``Content-Encoding`` header.
397 """
398 if response_class is None:
399 from .wrappers import Response
401 response_class = Response
403 path: str | None = None
404 file: t.IO[bytes] | None = None
405 size: int | None = None
406 mtime: float | None = None
407 headers = Headers()
409 if isinstance(path_or_file, (os.PathLike, str)) or hasattr(
410 path_or_file, "__fspath__"
411 ):
412 path_or_file = t.cast("t.Union[os.PathLike[str], str]", path_or_file)
414 # Flask will pass app.root_path, allowing its send_file wrapper
415 # to not have to deal with paths.
416 if _root_path is not None:
417 path = os.path.join(_root_path, path_or_file)
418 else:
419 path = os.path.abspath(path_or_file)
421 stat = os.stat(path)
422 size = stat.st_size
423 mtime = stat.st_mtime
424 else:
425 file = path_or_file
427 if download_name is None and path is not None:
428 download_name = os.path.basename(path)
430 if mimetype is None:
431 if download_name is None:
432 raise TypeError(
433 "Unable to detect the MIME type because a file name is"
434 " not available. Either set 'download_name', pass a"
435 " path instead of a file, or set 'mimetype'."
436 )
438 mimetype, encoding = mimetypes.guess_type(download_name)
440 if mimetype is None:
441 mimetype = "application/octet-stream"
443 # Don't send encoding for attachments, it causes browsers to
444 # save decompress tar.gz files.
445 if encoding is not None and not as_attachment:
446 headers.set("Content-Encoding", encoding)
448 if download_name is not None:
449 try:
450 download_name.encode("ascii")
451 except UnicodeEncodeError:
452 simple = unicodedata.normalize("NFKD", download_name)
453 simple = simple.encode("ascii", "ignore").decode("ascii")
454 # safe = RFC 5987 attr-char
455 quoted = quote(download_name, safe="!#$&+-.^_`|~")
456 names = {"filename": simple, "filename*": f"UTF-8''{quoted}"}
457 else:
458 names = {"filename": download_name}
460 value = "attachment" if as_attachment else "inline"
461 headers.set("Content-Disposition", value, **names)
462 elif as_attachment:
463 raise TypeError(
464 "No name provided for attachment. Either set"
465 " 'download_name' or pass a path instead of a file."
466 )
468 if use_x_sendfile and path is not None:
469 headers["X-Sendfile"] = path
470 data = None
471 else:
472 if file is None:
473 file = open(path, "rb") # type: ignore
474 elif isinstance(file, io.BytesIO):
475 size = file.getbuffer().nbytes
476 elif isinstance(file, io.TextIOBase):
477 raise ValueError("Files must be opened in binary mode or use BytesIO.")
479 data = wrap_file(environ, file)
481 rv = response_class(
482 data, mimetype=mimetype, headers=headers, direct_passthrough=True
483 )
485 if size is not None:
486 rv.content_length = size
488 if last_modified is not None:
489 rv.last_modified = last_modified # type: ignore
490 elif mtime is not None:
491 rv.last_modified = mtime # type: ignore
493 rv.cache_control.no_cache = True
495 # Flask will pass app.get_send_file_max_age, allowing its send_file
496 # wrapper to not have to deal with paths.
497 if callable(max_age):
498 max_age = max_age(path)
500 if max_age is not None:
501 if max_age > 0:
502 rv.cache_control.no_cache = None
503 rv.cache_control.public = True
505 rv.cache_control.max_age = max_age
506 rv.expires = int(time() + max_age) # type: ignore
508 if isinstance(etag, str):
509 rv.set_etag(etag)
510 elif etag and path is not None:
511 check = adler32(path.encode()) & 0xFFFFFFFF
512 rv.set_etag(f"{mtime}-{size}-{check}")
514 if conditional:
515 try:
516 rv = rv.make_conditional(environ, accept_ranges=True, complete_length=size)
517 except RequestedRangeNotSatisfiable:
518 if file is not None:
519 file.close()
521 raise
523 # Some x-sendfile implementations incorrectly ignore the 304
524 # status code and send the file anyway.
525 if rv.status_code == 304:
526 rv.headers.pop("x-sendfile", None)
528 return rv
531def send_from_directory(
532 directory: os.PathLike[str] | str,
533 path: os.PathLike[str] | str,
534 environ: WSGIEnvironment,
535 **kwargs: t.Any,
536) -> Response:
537 """Send a file from within a directory using :func:`send_file`.
539 This is a secure way to serve files from a folder, such as static
540 files or uploads. Uses :func:`~werkzeug.security.safe_join` to
541 ensure the path coming from the client is not maliciously crafted to
542 point outside the specified directory.
544 If the final path does not point to an existing regular file,
545 returns a 404 :exc:`~werkzeug.exceptions.NotFound` error.
547 :param directory: The directory that ``path`` must be located under. This *must not*
548 be a value provided by the client, otherwise it becomes insecure.
549 :param path: The path to the file to send, relative to ``directory``. This is the
550 part of the path provided by the client, which is checked for security.
551 :param environ: The WSGI environ for the current request.
552 :param kwargs: Arguments to pass to :func:`send_file`.
554 .. versionadded:: 2.0
555 Adapted from Flask's implementation.
556 """
557 path_str = safe_join(os.fspath(directory), os.fspath(path))
559 if path_str is None:
560 raise NotFound()
562 # Flask will pass app.root_path, allowing its send_from_directory
563 # wrapper to not have to deal with paths.
564 if "_root_path" in kwargs:
565 path_str = os.path.join(kwargs["_root_path"], path_str)
567 if not os.path.isfile(path_str):
568 raise NotFound()
570 return send_file(path_str, environ, **kwargs)
573def import_string(import_name: str, silent: bool = False) -> t.Any:
574 """Imports an object based on a string. This is useful if you want to
575 use import paths as endpoints or something similar. An import path can
576 be specified either in dotted notation (``xml.sax.saxutils.escape``)
577 or with a colon as object delimiter (``xml.sax.saxutils:escape``).
579 If `silent` is True the return value will be `None` if the import fails.
581 :param import_name: the dotted name for the object to import.
582 :param silent: if set to `True` import errors are ignored and
583 `None` is returned instead.
584 :return: imported object
585 """
586 import_name = import_name.replace(":", ".")
587 try:
588 try:
589 __import__(import_name)
590 except ImportError:
591 if "." not in import_name:
592 raise
593 else:
594 return sys.modules[import_name]
596 module_name, obj_name = import_name.rsplit(".", 1)
597 module = __import__(module_name, globals(), locals(), [obj_name])
598 try:
599 return getattr(module, obj_name)
600 except AttributeError as e:
601 raise ImportError(e) from None
603 except ImportError as e:
604 if not silent:
605 raise ImportStringError(import_name, e).with_traceback(
606 sys.exc_info()[2]
607 ) from None
609 return None
612def find_modules(
613 import_path: str, include_packages: bool = False, recursive: bool = False
614) -> t.Iterator[str]:
615 """Finds all the modules below a package. This can be useful to
616 automatically import all views / controllers so that their metaclasses /
617 function decorators have a chance to register themselves on the
618 application.
620 Packages are not returned unless `include_packages` is `True`. This can
621 also recursively list modules but in that case it will import all the
622 packages to get the correct load path of that module.
624 :param import_path: the dotted name for the package to find child modules.
625 :param include_packages: set to `True` if packages should be returned, too.
626 :param recursive: set to `True` if recursion should happen.
627 :return: generator
628 """
629 module = import_string(import_path)
630 path = getattr(module, "__path__", None)
631 if path is None:
632 raise ValueError(f"{import_path!r} is not a package")
633 basename = f"{module.__name__}."
634 for _importer, modname, ispkg in pkgutil.iter_modules(path):
635 modname = basename + modname
636 if ispkg:
637 if include_packages:
638 yield modname
639 if recursive:
640 yield from find_modules(modname, include_packages, True)
641 else:
642 yield modname
645class ImportStringError(ImportError):
646 """Provides information about a failed :func:`import_string` attempt."""
648 #: String in dotted notation that failed to be imported.
649 import_name: str
650 #: Wrapped exception.
651 exception: BaseException
653 def __init__(self, import_name: str, exception: BaseException) -> None:
654 self.import_name = import_name
655 self.exception = exception
656 msg = import_name
657 name = ""
658 tracked = []
659 for part in import_name.replace(":", ".").split("."):
660 name = f"{name}.{part}" if name else part
661 imported = import_string(name, silent=True)
662 if imported:
663 tracked.append((name, getattr(imported, "__file__", None)))
664 else:
665 track = [f"- {n!r} found in {i!r}." for n, i in tracked]
666 track.append(f"- {name!r} not found.")
667 track_str = "\n".join(track)
668 msg = (
669 f"import_string() failed for {import_name!r}. Possible reasons"
670 f" are:\n\n"
671 "- missing __init__.py in a package;\n"
672 "- package or module path not included in sys.path;\n"
673 "- duplicated package or module name taking precedence in"
674 " sys.path;\n"
675 "- missing module, class, function or variable;\n\n"
676 f"Debugged import:\n\n{track_str}\n\n"
677 f"Original exception:\n\n{type(exception).__name__}: {exception}"
678 )
679 break
681 super().__init__(msg)
683 def __repr__(self) -> str:
684 return f"<{type(self).__name__}({self.import_name!r}, {self.exception!r})>"