1from __future__ import annotations
2
3import collections.abc as cabc
4import mimetypes
5import os
6import typing as t
7from io import BytesIO
8from os import fsdecode
9from os import fspath
10
11from .._internal import _plain_int
12from .headers import Headers
13from .structures import MultiDict
14
15
16class FileStorage:
17 """The :class:`FileStorage` class is a thin wrapper over incoming files.
18 It is used by the request object to represent uploaded files. All the
19 attributes of the wrapper stream are proxied by the file storage so
20 it's possible to do ``storage.read()`` instead of the long form
21 ``storage.stream.read()``.
22 """
23
24 def __init__(
25 self,
26 stream: t.IO[bytes] | None = None,
27 filename: str | None = None,
28 name: str | None = None,
29 content_type: str | None = None,
30 content_length: int | None = None,
31 headers: Headers | None = None,
32 ):
33 self.name = name
34 self.stream = stream or BytesIO()
35
36 # If no filename is provided, attempt to get the filename from
37 # the stream object. Python names special streams like
38 # ``<stderr>`` with angular brackets, skip these streams.
39 if filename is None:
40 filename = getattr(stream, "name", None)
41
42 if filename is not None:
43 filename = fsdecode(filename)
44
45 if filename and filename[0] == "<" and filename[-1] == ">":
46 filename = None
47 else:
48 filename = fsdecode(filename)
49
50 self.filename = filename
51
52 if headers is None:
53 headers = Headers()
54 self.headers = headers
55 if content_type is not None:
56 headers["Content-Type"] = content_type
57 if content_length is not None:
58 headers["Content-Length"] = str(content_length)
59
60 def _parse_content_type(self) -> None:
61 if not hasattr(self, "_parsed_content_type"):
62 self._parsed_content_type = http.parse_options_header(self.content_type)
63
64 @property
65 def content_type(self) -> str | None:
66 """The content-type sent in the header. Usually not available"""
67 return self.headers.get("content-type")
68
69 @property
70 def content_length(self) -> int:
71 """The content-length sent in the header. Usually not available"""
72 if "content-length" in self.headers:
73 try:
74 return _plain_int(self.headers["content-length"])
75 except ValueError:
76 pass
77
78 return 0
79
80 @property
81 def mimetype(self) -> str:
82 """Like :attr:`content_type`, but without parameters (eg, without
83 charset, type etc.) and always lowercase. For example if the content
84 type is ``text/HTML; charset=utf-8`` the mimetype would be
85 ``'text/html'``.
86
87 .. versionadded:: 0.7
88 """
89 self._parse_content_type()
90 return self._parsed_content_type[0].lower()
91
92 @property
93 def mimetype_params(self) -> dict[str, str]:
94 """The mimetype parameters as dict. For example if the content
95 type is ``text/html; charset=utf-8`` the params would be
96 ``{'charset': 'utf-8'}``.
97
98 .. versionadded:: 0.7
99 """
100 self._parse_content_type()
101 return self._parsed_content_type[1]
102
103 def save(
104 self, dst: str | os.PathLike[str] | t.IO[bytes], buffer_size: int = 16384
105 ) -> None:
106 """Save the file to a destination path or file object. If the
107 destination is a file object you have to close it yourself after the
108 call. The buffer size is the number of bytes held in memory during
109 the copy process. It defaults to 16KB.
110
111 For secure file saving also have a look at :func:`secure_filename`.
112
113 :param dst: a filename, :class:`os.PathLike`, or open file
114 object to write to.
115 :param buffer_size: Passed as the ``length`` parameter of
116 :func:`shutil.copyfileobj`.
117
118 .. versionchanged:: 1.0
119 Supports :mod:`pathlib`.
120 """
121 from shutil import copyfileobj
122
123 close_dst = False
124
125 if hasattr(dst, "__fspath__"):
126 dst = fspath(dst)
127
128 if isinstance(dst, str):
129 dst = open(dst, "wb")
130 close_dst = True
131
132 try:
133 copyfileobj(self.stream, dst, buffer_size)
134 finally:
135 if close_dst:
136 dst.close()
137
138 def close(self) -> None:
139 """Close the underlying file if possible."""
140 try:
141 self.stream.close()
142 except Exception:
143 pass
144
145 def __bool__(self) -> bool:
146 return bool(self.filename)
147
148 def __getattr__(self, name: str) -> t.Any:
149 try:
150 return getattr(self.stream, name)
151 except AttributeError:
152 # SpooledTemporaryFile on Python < 3.11 doesn't implement IOBase,
153 # get the attribute from its backing file instead.
154 if hasattr(self.stream, "_file"):
155 return getattr(self.stream._file, name)
156 raise
157
158 def __iter__(self) -> cabc.Iterator[bytes]:
159 return iter(self.stream)
160
161 def __repr__(self) -> str:
162 return f"<{type(self).__name__}: {self.filename!r} ({self.content_type!r})>"
163
164
165class FileMultiDict(MultiDict[str, FileStorage]):
166 """A special :class:`MultiDict` that has convenience methods to add
167 files to it. This is used for :class:`EnvironBuilder` and generally
168 useful for unittesting.
169
170 .. versionadded:: 0.5
171 """
172
173 def add_file(
174 self,
175 name: str,
176 file: str | os.PathLike[str] | t.IO[bytes] | FileStorage,
177 filename: str | None = None,
178 content_type: str | None = None,
179 ) -> None:
180 """Adds a new file to the dict. `file` can be a file name or
181 a :class:`file`-like or a :class:`FileStorage` object.
182
183 :param name: the name of the field.
184 :param file: a filename or :class:`file`-like object
185 :param filename: an optional filename
186 :param content_type: an optional content type
187 """
188 if isinstance(file, FileStorage):
189 self.add(name, file)
190 return
191
192 if isinstance(file, (str, os.PathLike)):
193 if filename is None:
194 filename = os.fspath(file)
195
196 file_obj: t.IO[bytes] = open(file, "rb")
197 else:
198 file_obj = file # type: ignore[assignment]
199
200 if filename and content_type is None:
201 content_type = (
202 mimetypes.guess_type(filename)[0] or "application/octet-stream"
203 )
204
205 self.add(name, FileStorage(file_obj, filename, name, content_type))
206
207
208# circular dependencies
209from .. import http