1from __future__ import annotations
2
3import typing as t
4from io import BytesIO
5from urllib.parse import parse_qsl
6
7from ._internal import _plain_int
8from .datastructures import FileStorage
9from .datastructures import Headers
10from .datastructures import MultiDict
11from .exceptions import RequestEntityTooLarge
12from .http import parse_options_header
13from .sansio.multipart import Data
14from .sansio.multipart import Epilogue
15from .sansio.multipart import Field
16from .sansio.multipart import File
17from .sansio.multipart import MultipartDecoder
18from .sansio.multipart import NeedData
19from .wsgi import get_content_length
20from .wsgi import get_input_stream
21
22# there are some platforms where SpooledTemporaryFile is not available.
23# In that case we need to provide a fallback.
24try:
25 from tempfile import SpooledTemporaryFile
26except ImportError:
27 from tempfile import TemporaryFile
28
29 SpooledTemporaryFile = None # type: ignore
30
31if t.TYPE_CHECKING:
32 import typing as te
33
34 from _typeshed.wsgi import WSGIEnvironment
35
36 t_parse_result = tuple[
37 t.IO[bytes], MultiDict[str, str], MultiDict[str, FileStorage]
38 ]
39
40 class TStreamFactory(te.Protocol):
41 def __call__(
42 self,
43 total_content_length: int | None,
44 content_type: str | None,
45 filename: str | None,
46 content_length: int | None = None,
47 ) -> t.IO[bytes]: ...
48
49
50F = t.TypeVar("F", bound=t.Callable[..., t.Any])
51
52
53def default_stream_factory(
54 total_content_length: int | None,
55 content_type: str | None,
56 filename: str | None,
57 content_length: int | None = None,
58) -> t.IO[bytes]:
59 max_size = 1024 * 500
60
61 if SpooledTemporaryFile is not None:
62 return t.cast(t.IO[bytes], SpooledTemporaryFile(max_size=max_size, mode="rb+"))
63 elif total_content_length is None or total_content_length > max_size:
64 return t.cast(t.IO[bytes], TemporaryFile("rb+"))
65
66 return BytesIO()
67
68
69def parse_form_data(
70 environ: WSGIEnvironment,
71 stream_factory: TStreamFactory | None = None,
72 max_form_memory_size: int | None = None,
73 max_content_length: int | None = None,
74 cls: type[MultiDict[str, t.Any]] | None = None,
75 silent: bool = True,
76 *,
77 max_form_parts: int | None = None,
78) -> t_parse_result:
79 """Parse the form data in the environ and return it as tuple in the form
80 ``(stream, form, files)``. You should only call this method if the
81 transport method is `POST`, `PUT`, or `PATCH`.
82
83 If the mimetype of the data transmitted is `multipart/form-data` the
84 files multidict will be filled with `FileStorage` objects. If the
85 mimetype is unknown the input stream is wrapped and returned as first
86 argument, else the stream is empty.
87
88 This is a shortcut for the common usage of :class:`FormDataParser`.
89
90 :param environ: the WSGI environment to be used for parsing.
91 :param stream_factory: An optional callable that returns a new read and
92 writeable file descriptor. This callable works
93 the same as :meth:`Response._get_file_stream`.
94 :param max_form_memory_size: the maximum number of bytes to be accepted for
95 in-memory stored form data. If the data
96 exceeds the value specified an
97 :exc:`~exceptions.RequestEntityTooLarge`
98 exception is raised.
99 :param max_content_length: If this is provided and the transmitted data
100 is longer than this value an
101 :exc:`~exceptions.RequestEntityTooLarge`
102 exception is raised.
103 :param cls: an optional dict class to use. If this is not specified
104 or `None` the default :class:`MultiDict` is used.
105 :param silent: If set to False parsing errors will not be caught.
106 :param max_form_parts: The maximum number of multipart parts to be parsed. If this
107 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised.
108 :return: A tuple in the form ``(stream, form, files)``.
109
110 .. versionchanged:: 3.0
111 The ``charset`` and ``errors`` parameters were removed.
112
113 .. versionchanged:: 2.3
114 Added the ``max_form_parts`` parameter.
115
116 .. versionadded:: 0.5.1
117 Added the ``silent`` parameter.
118
119 .. versionadded:: 0.5
120 Added the ``max_form_memory_size``, ``max_content_length``, and ``cls``
121 parameters.
122 """
123 return FormDataParser(
124 stream_factory=stream_factory,
125 max_form_memory_size=max_form_memory_size,
126 max_content_length=max_content_length,
127 max_form_parts=max_form_parts,
128 silent=silent,
129 cls=cls,
130 ).parse_from_environ(environ)
131
132
133class FormDataParser:
134 """This class implements parsing of form data for Werkzeug. By itself
135 it can parse multipart and url encoded form data. It can be subclassed
136 and extended but for most mimetypes it is a better idea to use the
137 untouched stream and expose it as separate attributes on a request
138 object.
139
140 :param stream_factory: An optional callable that returns a new read and
141 writeable file descriptor. This callable works
142 the same as :meth:`Response._get_file_stream`.
143 :param max_form_memory_size: the maximum number of bytes to be accepted for
144 in-memory stored form data. If the data
145 exceeds the value specified an
146 :exc:`~exceptions.RequestEntityTooLarge`
147 exception is raised.
148 :param max_content_length: If this is provided and the transmitted data
149 is longer than this value an
150 :exc:`~exceptions.RequestEntityTooLarge`
151 exception is raised.
152 :param cls: an optional dict class to use. If this is not specified
153 or `None` the default :class:`MultiDict` is used.
154 :param silent: If set to False parsing errors will not be caught.
155 :param max_form_parts: The maximum number of multipart parts to be parsed. If this
156 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised.
157
158 .. versionchanged:: 3.0
159 The ``charset`` and ``errors`` parameters were removed.
160
161 .. versionchanged:: 3.0
162 The ``parse_functions`` attribute and ``get_parse_func`` methods were removed.
163
164 .. versionchanged:: 2.2.3
165 Added the ``max_form_parts`` parameter.
166
167 .. versionadded:: 0.8
168 """
169
170 def __init__(
171 self,
172 stream_factory: TStreamFactory | None = None,
173 max_form_memory_size: int | None = None,
174 max_content_length: int | None = None,
175 cls: type[MultiDict[str, t.Any]] | None = None,
176 silent: bool = True,
177 *,
178 max_form_parts: int | None = None,
179 ) -> None:
180 if stream_factory is None:
181 stream_factory = default_stream_factory
182
183 self.stream_factory = stream_factory
184 self.max_form_memory_size = max_form_memory_size
185 self.max_content_length = max_content_length
186 self.max_form_parts = max_form_parts
187
188 if cls is None:
189 cls = t.cast("type[MultiDict[str, t.Any]]", MultiDict)
190
191 self.cls = cls
192 self.silent = silent
193
194 def parse_from_environ(self, environ: WSGIEnvironment) -> t_parse_result:
195 """Parses the information from the environment as form data.
196
197 :param environ: the WSGI environment to be used for parsing.
198 :return: A tuple in the form ``(stream, form, files)``.
199 """
200 stream = get_input_stream(environ, max_content_length=self.max_content_length)
201 content_length = get_content_length(environ)
202 mimetype, options = parse_options_header(environ.get("CONTENT_TYPE"))
203 return self.parse(
204 stream,
205 content_length=content_length,
206 mimetype=mimetype,
207 options=options,
208 )
209
210 def parse(
211 self,
212 stream: t.IO[bytes],
213 mimetype: str,
214 content_length: int | None,
215 options: dict[str, str] | None = None,
216 ) -> t_parse_result:
217 """Parses the information from the given stream, mimetype,
218 content length and mimetype parameters.
219
220 :param stream: an input stream
221 :param mimetype: the mimetype of the data
222 :param content_length: the content length of the incoming data
223 :param options: optional mimetype parameters (used for
224 the multipart boundary for instance)
225 :return: A tuple in the form ``(stream, form, files)``.
226
227 .. versionchanged:: 3.0
228 The invalid ``application/x-url-encoded`` content type is not
229 treated as ``application/x-www-form-urlencoded``.
230 """
231 if mimetype == "multipart/form-data":
232 parse_func = self._parse_multipart
233 elif mimetype == "application/x-www-form-urlencoded":
234 parse_func = self._parse_urlencoded
235 else:
236 return stream, self.cls(), self.cls()
237
238 if options is None:
239 options = {}
240
241 try:
242 return parse_func(stream, mimetype, content_length, options)
243 except ValueError:
244 if not self.silent:
245 raise
246
247 return stream, self.cls(), self.cls()
248
249 def _parse_multipart(
250 self,
251 stream: t.IO[bytes],
252 mimetype: str,
253 content_length: int | None,
254 options: dict[str, str],
255 ) -> t_parse_result:
256 parser = MultiPartParser(
257 stream_factory=self.stream_factory,
258 max_form_memory_size=self.max_form_memory_size,
259 max_form_parts=self.max_form_parts,
260 cls=self.cls,
261 )
262 boundary = options.get("boundary", "").encode("ascii")
263
264 if not boundary:
265 raise ValueError("Missing boundary")
266
267 form, files = parser.parse(stream, boundary, content_length)
268 return stream, form, files
269
270 def _parse_urlencoded(
271 self,
272 stream: t.IO[bytes],
273 mimetype: str,
274 content_length: int | None,
275 options: dict[str, str],
276 ) -> t_parse_result:
277 if (
278 self.max_form_memory_size is not None
279 and content_length is not None
280 and content_length > self.max_form_memory_size
281 ):
282 raise RequestEntityTooLarge()
283
284 items = parse_qsl(
285 stream.read().decode(),
286 keep_blank_values=True,
287 errors="werkzeug.url_quote",
288 )
289 return stream, self.cls(items), self.cls()
290
291
292class MultiPartParser:
293 def __init__(
294 self,
295 stream_factory: TStreamFactory | None = None,
296 max_form_memory_size: int | None = None,
297 cls: type[MultiDict[str, t.Any]] | None = None,
298 buffer_size: int = 64 * 1024,
299 max_form_parts: int | None = None,
300 ) -> None:
301 self.max_form_memory_size = max_form_memory_size
302 self.max_form_parts = max_form_parts
303
304 if stream_factory is None:
305 stream_factory = default_stream_factory
306
307 self.stream_factory = stream_factory
308
309 if cls is None:
310 cls = t.cast("type[MultiDict[str, t.Any]]", MultiDict)
311
312 self.cls = cls
313 self.buffer_size = buffer_size
314
315 def fail(self, message: str) -> te.NoReturn:
316 raise ValueError(message)
317
318 def get_part_charset(self, headers: Headers) -> str:
319 # Figure out input charset for current part
320 content_type = headers.get("content-type")
321
322 if content_type:
323 parameters = parse_options_header(content_type)[1]
324 ct_charset = parameters.get("charset", "").lower()
325
326 # A safe list of encodings. Modern clients should only send ASCII or UTF-8.
327 # This list will not be extended further.
328 if ct_charset in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}:
329 return ct_charset
330
331 return "utf-8"
332
333 def start_file_streaming(
334 self, event: File, total_content_length: int | None
335 ) -> t.IO[bytes]:
336 content_type = event.headers.get("content-type")
337
338 try:
339 content_length = _plain_int(event.headers["content-length"])
340 except (KeyError, ValueError):
341 content_length = 0
342
343 container = self.stream_factory(
344 total_content_length=total_content_length,
345 filename=event.filename,
346 content_type=content_type,
347 content_length=content_length,
348 )
349 return container
350
351 def parse(
352 self, stream: t.IO[bytes], boundary: bytes, content_length: int | None
353 ) -> tuple[MultiDict[str, str], MultiDict[str, FileStorage]]:
354 current_part: Field | File
355 field_size: int | None = None
356 container: t.IO[bytes] | list[bytes]
357 _write: t.Callable[[bytes], t.Any]
358
359 parser = MultipartDecoder(
360 boundary,
361 max_form_memory_size=self.max_form_memory_size,
362 max_parts=self.max_form_parts,
363 )
364
365 fields = []
366 files = []
367
368 for data in _chunk_iter(stream.read, self.buffer_size):
369 parser.receive_data(data)
370 event = parser.next_event()
371 while not isinstance(event, (Epilogue, NeedData)):
372 if isinstance(event, Field):
373 current_part = event
374 field_size = 0
375 container = []
376 _write = container.append
377 elif isinstance(event, File):
378 current_part = event
379 field_size = None
380 container = self.start_file_streaming(event, content_length)
381 _write = container.write
382 elif isinstance(event, Data):
383 if self.max_form_memory_size is not None and field_size is not None:
384 # Ensure that accumulated data events do not exceed limit.
385 # Also checked within single event in MultipartDecoder.
386 field_size += len(event.data)
387
388 if field_size > self.max_form_memory_size:
389 raise RequestEntityTooLarge()
390
391 _write(event.data)
392 if not event.more_data:
393 if isinstance(current_part, Field):
394 value = b"".join(container).decode(
395 self.get_part_charset(current_part.headers), "replace"
396 )
397 fields.append((current_part.name, value))
398 else:
399 container = t.cast(t.IO[bytes], container)
400 container.seek(0)
401 files.append(
402 (
403 current_part.name,
404 FileStorage(
405 container,
406 current_part.filename,
407 current_part.name,
408 headers=current_part.headers,
409 ),
410 )
411 )
412
413 event = parser.next_event()
414
415 return self.cls(fields), self.cls(files)
416
417
418def _chunk_iter(read: t.Callable[[int], bytes], size: int) -> t.Iterator[bytes | None]:
419 """Read data in chunks for multipart/form-data parsing. Stop if no data is read.
420 Yield ``None`` at the end to signal end of parsing.
421 """
422 while True:
423 data = read(size)
424
425 if not data:
426 break
427
428 yield data
429
430 yield None