1from __future__ import annotations
2
3import collections.abc as cabc
4import mimetypes
5import os
6import typing as t
7from io import BytesIO
8from os import fsdecode
9from os import fspath
10
11from .._internal import _plain_int
12from ..http import parse_options_header
13from .headers import Headers
14from .structures import MultiDict
15
16
17class FileStorage:
18 """The :class:`FileStorage` class is a thin wrapper over incoming files.
19 It is used by the request object to represent uploaded files. All the
20 attributes of the wrapper stream are proxied by the file storage so
21 it's possible to do ``storage.read()`` instead of the long form
22 ``storage.stream.read()``.
23 """
24
25 def __init__(
26 self,
27 stream: t.IO[bytes] | None = None,
28 filename: str | None = None,
29 name: str | None = None,
30 content_type: str | None = None,
31 content_length: int | None = None,
32 headers: Headers | None = None,
33 ):
34 self.name = name
35 self.stream = stream or BytesIO()
36 self.filename = _guess_filename(self.stream, filename)
37
38 if headers is None:
39 headers = Headers()
40 self.headers = headers
41 if content_type is not None:
42 headers["Content-Type"] = content_type
43 if content_length is not None:
44 headers["Content-Length"] = str(content_length)
45
46 def _parse_content_type(self) -> None:
47 if not hasattr(self, "_parsed_content_type"):
48 self._parsed_content_type = parse_options_header(self.content_type)
49
50 @property
51 def content_type(self) -> str | None:
52 """The content-type sent in the header. Usually not available"""
53 return self.headers.get("Content-Type")
54
55 @property
56 def content_length(self) -> int:
57 """The content-length sent in the header. Usually not available"""
58 if "Content-Length" in self.headers:
59 try:
60 return _plain_int(self.headers["Content-Length"])
61 except ValueError:
62 pass
63
64 return 0
65
66 @property
67 def mimetype(self) -> str:
68 """Like :attr:`content_type`, but without parameters (eg, without
69 charset, type etc.) and always lowercase. For example if the content
70 type is ``text/HTML; charset=utf-8`` the mimetype would be
71 ``'text/html'``.
72
73 .. versionadded:: 0.7
74 """
75 self._parse_content_type()
76 return self._parsed_content_type[0].lower()
77
78 @property
79 def mimetype_params(self) -> dict[str, str]:
80 """The mimetype parameters as dict. For example if the content
81 type is ``text/html; charset=utf-8`` the params would be
82 ``{'charset': 'utf-8'}``.
83
84 .. versionadded:: 0.7
85 """
86 self._parse_content_type()
87 return self._parsed_content_type[1]
88
89 def save(
90 self, dst: str | os.PathLike[str] | t.IO[bytes], buffer_size: int = 16384
91 ) -> None:
92 """Save the file to a destination path or file object. If the
93 destination is a file object you have to close it yourself after the
94 call. The buffer size is the number of bytes held in memory during
95 the copy process. It defaults to 16KB.
96
97 For secure file saving also have a look at :func:`secure_filename`.
98
99 :param dst: a filename, :class:`os.PathLike`, or open file
100 object to write to.
101 :param buffer_size: Passed as the ``length`` parameter of
102 :func:`shutil.copyfileobj`.
103
104 .. versionchanged:: 1.0
105 Supports :mod:`pathlib`.
106 """
107 from shutil import copyfileobj
108
109 close_dst = False
110
111 if hasattr(dst, "__fspath__"):
112 dst = fspath(dst)
113
114 if isinstance(dst, str):
115 dst = open(dst, "wb")
116 close_dst = True
117
118 try:
119 copyfileobj(self.stream, dst, buffer_size)
120 finally:
121 if close_dst:
122 dst.close()
123
124 def close(self) -> None:
125 """Close the underlying file if possible."""
126 try:
127 self.stream.close()
128 except Exception:
129 pass
130
131 def __bool__(self) -> bool:
132 return bool(self.filename)
133
134 def __getattr__(self, name: str) -> t.Any:
135 try:
136 return getattr(self.stream, name)
137 except AttributeError:
138 # SpooledTemporaryFile on Python < 3.11 doesn't implement IOBase,
139 # get the attribute from its backing file instead.
140 if hasattr(self.stream, "_file"):
141 return getattr(self.stream._file, name)
142 raise
143
144 def __iter__(self) -> cabc.Iterator[bytes]:
145 return iter(self.stream)
146
147 def __repr__(self) -> str:
148 return f"<{type(self).__name__}: {self.filename!r} ({self.content_type!r})>"
149
150
151class FileMultiDict(MultiDict[str, FileStorage]):
152 """A :class:`MultiDict` for managing form data file values. Used by
153 :class:`.EnvironBuilder` for tests.
154
155 .. versionadded:: 0.5
156 """
157
158 def add_file(
159 self,
160 name: str,
161 file: str | os.PathLike[str] | t.IO[bytes] | FileStorage,
162 filename: str | None = None,
163 content_type: str | None = None,
164 ) -> None:
165 """Add a file to the given key. Can be passed a filename or IO object,
166 which will construct a :class:`.FileStorage` object.
167
168 :param name: The key to add the file to.
169 :param file: The file to add. Constructs a :class:`FileStorage` object
170 if the value is not one.
171 :param filename: The filename to set for the field. Defaults to ``file``
172 if it's a filename or ``file.name`` if it's an IO object.
173 :param content_type: The content type to set for the field. Defaults to
174 guessing based on the filename, falling back to
175 ``application/octet-stream``.
176
177 .. versionchanged:: 3.2
178 The filename is detected from an IO object.
179 """
180 if isinstance(file, FileStorage):
181 self.add(name, file)
182 return
183
184 if isinstance(file, (str, os.PathLike)):
185 if filename is None:
186 filename = os.fspath(file)
187
188 file_obj: t.IO[bytes] = open(file, "rb")
189 else:
190 file_obj = file # type: ignore[assignment]
191 filename = _guess_filename(file_obj, filename)
192
193 if filename is not None and content_type is None:
194 content_type = (
195 mimetypes.guess_type(filename)[0] or "application/octet-stream"
196 )
197
198 self.add(name, FileStorage(file_obj, filename, name, content_type))
199
200 def close(self) -> None:
201 """Call :meth:`~FileStorage.close` on every open file.
202
203 .. versionadded:: 3.2
204 """
205 for values in self.listvalues():
206 for value in values:
207 if not value.closed:
208 value.close()
209
210 def clear(self) -> None:
211 """Call :meth:`close`, then remove all items.
212
213 .. versionadded:: 3.2
214 """
215 self.close()
216 super().clear()
217
218
219def _guess_filename(stream: t.IO[t.Any], filename: str | None) -> str | None:
220 if filename is not None:
221 return fsdecode(filename)
222
223 filename = getattr(stream, "name", None)
224
225 if filename is not None:
226 filename = fsdecode(filename)
227
228 # Python names special streams like `<stderr>`, ignore these.
229 if filename[:1] == "<" and filename[-1:] == ">":
230 filename = None
231
232 return filename