1from __future__ import annotations
2
3import mimetypes
4from io import BytesIO
5from os import fsdecode
6from os import fspath
7
8from .._internal import _plain_int
9from .structures import MultiDict
10
11
12class FileStorage:
13 """The :class:`FileStorage` class is a thin wrapper over incoming files.
14 It is used by the request object to represent uploaded files. All the
15 attributes of the wrapper stream are proxied by the file storage so
16 it's possible to do ``storage.read()`` instead of the long form
17 ``storage.stream.read()``.
18 """
19
20 def __init__(
21 self,
22 stream=None,
23 filename=None,
24 name=None,
25 content_type=None,
26 content_length=None,
27 headers=None,
28 ):
29 self.name = name
30 self.stream = stream or BytesIO()
31
32 # If no filename is provided, attempt to get the filename from
33 # the stream object. Python names special streams like
34 # ``<stderr>`` with angular brackets, skip these streams.
35 if filename is None:
36 filename = getattr(stream, "name", None)
37
38 if filename is not None:
39 filename = fsdecode(filename)
40
41 if filename and filename[0] == "<" and filename[-1] == ">":
42 filename = None
43 else:
44 filename = fsdecode(filename)
45
46 self.filename = filename
47
48 if headers is None:
49 from .headers import Headers
50
51 headers = Headers()
52 self.headers = headers
53 if content_type is not None:
54 headers["Content-Type"] = content_type
55 if content_length is not None:
56 headers["Content-Length"] = str(content_length)
57
58 def _parse_content_type(self):
59 if not hasattr(self, "_parsed_content_type"):
60 self._parsed_content_type = http.parse_options_header(self.content_type)
61
62 @property
63 def content_type(self):
64 """The content-type sent in the header. Usually not available"""
65 return self.headers.get("content-type")
66
67 @property
68 def content_length(self):
69 """The content-length sent in the header. Usually not available"""
70 if "content-length" in self.headers:
71 try:
72 return _plain_int(self.headers["content-length"])
73 except ValueError:
74 pass
75
76 return 0
77
78 @property
79 def mimetype(self):
80 """Like :attr:`content_type`, but without parameters (eg, without
81 charset, type etc.) and always lowercase. For example if the content
82 type is ``text/HTML; charset=utf-8`` the mimetype would be
83 ``'text/html'``.
84
85 .. versionadded:: 0.7
86 """
87 self._parse_content_type()
88 return self._parsed_content_type[0].lower()
89
90 @property
91 def mimetype_params(self):
92 """The mimetype parameters as dict. For example if the content
93 type is ``text/html; charset=utf-8`` the params would be
94 ``{'charset': 'utf-8'}``.
95
96 .. versionadded:: 0.7
97 """
98 self._parse_content_type()
99 return self._parsed_content_type[1]
100
101 def save(self, dst, buffer_size=16384):
102 """Save the file to a destination path or file object. If the
103 destination is a file object you have to close it yourself after the
104 call. The buffer size is the number of bytes held in memory during
105 the copy process. It defaults to 16KB.
106
107 For secure file saving also have a look at :func:`secure_filename`.
108
109 :param dst: a filename, :class:`os.PathLike`, or open file
110 object to write to.
111 :param buffer_size: Passed as the ``length`` parameter of
112 :func:`shutil.copyfileobj`.
113
114 .. versionchanged:: 1.0
115 Supports :mod:`pathlib`.
116 """
117 from shutil import copyfileobj
118
119 close_dst = False
120
121 if hasattr(dst, "__fspath__"):
122 dst = fspath(dst)
123
124 if isinstance(dst, str):
125 dst = open(dst, "wb")
126 close_dst = True
127
128 try:
129 copyfileobj(self.stream, dst, buffer_size)
130 finally:
131 if close_dst:
132 dst.close()
133
134 def close(self):
135 """Close the underlying file if possible."""
136 try:
137 self.stream.close()
138 except Exception:
139 pass
140
141 def __bool__(self):
142 return bool(self.filename)
143
144 def __getattr__(self, name):
145 try:
146 return getattr(self.stream, name)
147 except AttributeError:
148 # SpooledTemporaryFile doesn't implement IOBase, get the
149 # attribute from its backing file instead.
150 # https://github.com/python/cpython/pull/3249
151 if hasattr(self.stream, "_file"):
152 return getattr(self.stream._file, name)
153 raise
154
155 def __iter__(self):
156 return iter(self.stream)
157
158 def __repr__(self):
159 return f"<{type(self).__name__}: {self.filename!r} ({self.content_type!r})>"
160
161
162class FileMultiDict(MultiDict):
163 """A special :class:`MultiDict` that has convenience methods to add
164 files to it. This is used for :class:`EnvironBuilder` and generally
165 useful for unittesting.
166
167 .. versionadded:: 0.5
168 """
169
170 def add_file(self, name, file, filename=None, content_type=None):
171 """Adds a new file to the dict. `file` can be a file name or
172 a :class:`file`-like or a :class:`FileStorage` object.
173
174 :param name: the name of the field.
175 :param file: a filename or :class:`file`-like object
176 :param filename: an optional filename
177 :param content_type: an optional content type
178 """
179 if isinstance(file, FileStorage):
180 value = file
181 else:
182 if isinstance(file, str):
183 if filename is None:
184 filename = file
185 file = open(file, "rb")
186 if filename and content_type is None:
187 content_type = (
188 mimetypes.guess_type(filename)[0] or "application/octet-stream"
189 )
190 value = FileStorage(file, filename, name, content_type)
191
192 self.add(name, value)
193
194
195# circular dependencies
196from .. import http