Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/datastructures/file_storage.py: 29%
92 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 07:17 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 07:17 +0000
1from __future__ import annotations
3import mimetypes
4from io import BytesIO
5from os import fsdecode
6from os import fspath
8from .._internal import _plain_int
9from .structures import MultiDict
12class FileStorage:
13 """The :class:`FileStorage` class is a thin wrapper over incoming files.
14 It is used by the request object to represent uploaded files. All the
15 attributes of the wrapper stream are proxied by the file storage so
16 it's possible to do ``storage.read()`` instead of the long form
17 ``storage.stream.read()``.
18 """
20 def __init__(
21 self,
22 stream=None,
23 filename=None,
24 name=None,
25 content_type=None,
26 content_length=None,
27 headers=None,
28 ):
29 self.name = name
30 self.stream = stream or BytesIO()
32 # If no filename is provided, attempt to get the filename from
33 # the stream object. Python names special streams like
34 # ``<stderr>`` with angular brackets, skip these streams.
35 if filename is None:
36 filename = getattr(stream, "name", None)
38 if filename is not None:
39 filename = fsdecode(filename)
41 if filename and filename[0] == "<" and filename[-1] == ">":
42 filename = None
43 else:
44 filename = fsdecode(filename)
46 self.filename = filename
48 if headers is None:
49 from .headers import Headers
51 headers = Headers()
52 self.headers = headers
53 if content_type is not None:
54 headers["Content-Type"] = content_type
55 if content_length is not None:
56 headers["Content-Length"] = str(content_length)
58 def _parse_content_type(self):
59 if not hasattr(self, "_parsed_content_type"):
60 self._parsed_content_type = http.parse_options_header(self.content_type)
62 @property
63 def content_type(self):
64 """The content-type sent in the header. Usually not available"""
65 return self.headers.get("content-type")
67 @property
68 def content_length(self):
69 """The content-length sent in the header. Usually not available"""
70 if "content-length" in self.headers:
71 try:
72 return _plain_int(self.headers["content-length"])
73 except ValueError:
74 pass
76 return 0
78 @property
79 def mimetype(self):
80 """Like :attr:`content_type`, but without parameters (eg, without
81 charset, type etc.) and always lowercase. For example if the content
82 type is ``text/HTML; charset=utf-8`` the mimetype would be
83 ``'text/html'``.
85 .. versionadded:: 0.7
86 """
87 self._parse_content_type()
88 return self._parsed_content_type[0].lower()
90 @property
91 def mimetype_params(self):
92 """The mimetype parameters as dict. For example if the content
93 type is ``text/html; charset=utf-8`` the params would be
94 ``{'charset': 'utf-8'}``.
96 .. versionadded:: 0.7
97 """
98 self._parse_content_type()
99 return self._parsed_content_type[1]
101 def save(self, dst, buffer_size=16384):
102 """Save the file to a destination path or file object. If the
103 destination is a file object you have to close it yourself after the
104 call. The buffer size is the number of bytes held in memory during
105 the copy process. It defaults to 16KB.
107 For secure file saving also have a look at :func:`secure_filename`.
109 :param dst: a filename, :class:`os.PathLike`, or open file
110 object to write to.
111 :param buffer_size: Passed as the ``length`` parameter of
112 :func:`shutil.copyfileobj`.
114 .. versionchanged:: 1.0
115 Supports :mod:`pathlib`.
116 """
117 from shutil import copyfileobj
119 close_dst = False
121 if hasattr(dst, "__fspath__"):
122 dst = fspath(dst)
124 if isinstance(dst, str):
125 dst = open(dst, "wb")
126 close_dst = True
128 try:
129 copyfileobj(self.stream, dst, buffer_size)
130 finally:
131 if close_dst:
132 dst.close()
134 def close(self):
135 """Close the underlying file if possible."""
136 try:
137 self.stream.close()
138 except Exception:
139 pass
141 def __bool__(self):
142 return bool(self.filename)
144 def __getattr__(self, name):
145 try:
146 return getattr(self.stream, name)
147 except AttributeError:
148 # SpooledTemporaryFile doesn't implement IOBase, get the
149 # attribute from its backing file instead.
150 # https://github.com/python/cpython/pull/3249
151 if hasattr(self.stream, "_file"):
152 return getattr(self.stream._file, name)
153 raise
155 def __iter__(self):
156 return iter(self.stream)
158 def __repr__(self):
159 return f"<{type(self).__name__}: {self.filename!r} ({self.content_type!r})>"
162class FileMultiDict(MultiDict):
163 """A special :class:`MultiDict` that has convenience methods to add
164 files to it. This is used for :class:`EnvironBuilder` and generally
165 useful for unittesting.
167 .. versionadded:: 0.5
168 """
170 def add_file(self, name, file, filename=None, content_type=None):
171 """Adds a new file to the dict. `file` can be a file name or
172 a :class:`file`-like or a :class:`FileStorage` object.
174 :param name: the name of the field.
175 :param file: a filename or :class:`file`-like object
176 :param filename: an optional filename
177 :param content_type: an optional content type
178 """
179 if isinstance(file, FileStorage):
180 value = file
181 else:
182 if isinstance(file, str):
183 if filename is None:
184 filename = file
185 file = open(file, "rb")
186 if filename and content_type is None:
187 content_type = (
188 mimetypes.guess_type(filename)[0] or "application/octet-stream"
189 )
190 value = FileStorage(file, filename, name, content_type)
192 self.add(name, value)
195# circular dependencies
196from .. import http