Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/formparser.py: 67%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import typing as t
4from tempfile import SpooledTemporaryFile
5from types import TracebackType
6from urllib.parse import parse_qsl
8from ._internal import _plain_int
9from .datastructures import FileStorage
10from .datastructures import Headers
11from .datastructures import ImmutableMultiDict
12from .datastructures import MultiDict
13from .exceptions import RequestEntityTooLarge
14from .http import parse_options_header
15from .sansio.multipart import Data
16from .sansio.multipart import Epilogue
17from .sansio.multipart import Field
18from .sansio.multipart import File
19from .sansio.multipart import MultipartDecoder
20from .sansio.multipart import NeedData
21from .wsgi import get_content_length
22from .wsgi import get_input_stream
24if t.TYPE_CHECKING:
25 import typing_extensions as te
26 from _typeshed.wsgi import WSGIEnvironment
28 t_parse_result = tuple[
29 t.IO[bytes], MultiDict[str, str], MultiDict[str, FileStorage]
30 ]
32 class TStreamFactory(te.Protocol):
33 def __call__(
34 self,
35 total_content_length: int | None,
36 content_type: str | None,
37 filename: str | None,
38 content_length: int | None = None,
39 ) -> t.IO[bytes]: ...
42F = t.TypeVar("F", bound=t.Callable[..., t.Any])
45def default_stream_factory(
46 total_content_length: int | None,
47 content_type: str | None,
48 filename: str | None,
49 content_length: int | None = None,
50) -> t.IO[bytes]:
51 return SpooledTemporaryFile(max_size=1024 * 500, mode="rb+")
54def parse_form_data(
55 environ: WSGIEnvironment,
56 stream_factory: TStreamFactory | None = None,
57 max_form_memory_size: int | None = None,
58 max_content_length: int | None = None,
59 silent: bool = True,
60 *,
61 max_form_parts: int | None = None,
62 **kwargs: t.Any,
63) -> t_parse_result:
64 """Parse the form data in the environ and return it as tuple in the form
65 ``(stream, form, files)``. You should only call this method if the
66 transport method is `POST`, `PUT`, or `PATCH`.
68 If the mimetype of the data transmitted is `multipart/form-data` the
69 files multidict will be filled with `FileStorage` objects. If the
70 mimetype is unknown the input stream is wrapped and returned as first
71 argument, else the stream is empty.
73 This is a shortcut for the common usage of :class:`FormDataParser`.
75 :param environ: the WSGI environment to be used for parsing.
76 :param stream_factory: An optional callable that returns a new read and
77 writeable file descriptor. This callable works
78 the same as :meth:`Response._get_file_stream`.
79 :param max_form_memory_size: the maximum number of bytes to be accepted for
80 in-memory stored form data. If the data
81 exceeds the value specified an
82 :exc:`~exceptions.RequestEntityTooLarge`
83 exception is raised.
84 :param max_content_length: If this is provided and the transmitted data
85 is longer than this value an
86 :exc:`~exceptions.RequestEntityTooLarge`
87 exception is raised.
88 :param silent: If set to False parsing errors will not be caught.
89 :param max_form_parts: The maximum number of multipart parts to be parsed. If this
90 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised.
91 :return: A tuple in the form ``(stream, form, files)``.
93 .. versionchanged:: 3.2
94 The ``cls`` parameter is deprecated and will be removed in Werkzeug 3.3. It will
95 always be ``ImmutableMultiDict``.
97 .. versionchanged:: 3.0
98 The ``charset`` and ``errors`` parameters were removed.
100 .. versionchanged:: 2.3
101 Added the ``max_form_parts`` parameter.
103 .. versionadded:: 0.5.1
104 Added the ``silent`` parameter.
106 .. versionadded:: 0.5
107 Added the ``max_form_memory_size``, ``max_content_length``, and ``cls``
108 parameters.
109 """
110 parser_kwargs: dict[str, t.Any] = dict(
111 stream_factory=stream_factory,
112 max_form_memory_size=max_form_memory_size,
113 max_content_length=max_content_length,
114 max_form_parts=max_form_parts,
115 silent=silent,
116 )
118 if "cls" in kwargs:
119 import warnings
121 warnings.warn(
122 "The 'cls' parameter is deprecated and will be removed in Werkzeug 3.3."
123 " It will always be 'ImmutableMultiDict'.",
124 DeprecationWarning,
125 stacklevel=2,
126 )
127 parser_kwargs["cls"] = kwargs["cls"]
129 return FormDataParser(**parser_kwargs).parse_from_environ(environ)
132class FormDataParser:
133 """This class implements parsing of form data for Werkzeug. By itself
134 it can parse multipart and url encoded form data. It can be subclassed
135 and extended but for most mimetypes it is a better idea to use the
136 untouched stream and expose it as separate attributes on a request
137 object.
139 :param stream_factory: An optional callable that returns a new read and
140 writeable file descriptor. This callable works
141 the same as :meth:`Response._get_file_stream`.
142 :param max_form_memory_size: the maximum number of bytes to be accepted for
143 in-memory stored form data. If the data
144 exceeds the value specified an
145 :exc:`~exceptions.RequestEntityTooLarge`
146 exception is raised.
147 :param max_content_length: If this is provided and the transmitted data
148 is longer than this value an
149 :exc:`~exceptions.RequestEntityTooLarge`
150 exception is raised.
151 :param silent: If set to False parsing errors will not be caught.
152 :param max_form_parts: The maximum number of multipart parts to be parsed. If this
153 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised.
155 .. versionchanged:: 3.2
156 The ``cls`` parameter and attribute are deprecated and will be removed
157 in Werkzeug 3.3. They will always be ``ImmutableMultiDict``.
159 .. versionchanged:: 3.0
160 The ``charset`` and ``errors`` parameters were removed.
162 .. versionchanged:: 3.0
163 The ``parse_functions`` attribute and ``get_parse_func`` methods were removed.
165 .. versionchanged:: 2.2.3
166 Added the ``max_form_parts`` parameter.
168 .. versionadded:: 0.8
169 """
171 def __init__(
172 self,
173 stream_factory: TStreamFactory | None = None,
174 max_form_memory_size: int | None = None,
175 max_content_length: int | None = None,
176 silent: bool = True,
177 *,
178 max_form_parts: int | None = None,
179 **kwargs: t.Any,
180 ) -> None:
181 if stream_factory is None:
182 stream_factory = default_stream_factory
184 self.stream_factory = stream_factory
185 self.max_form_memory_size = max_form_memory_size
186 self.max_content_length = max_content_length
187 self.max_form_parts = max_form_parts
189 if "cls" in kwargs:
190 import warnings
192 warnings.warn(
193 "The 'cls' parameter is deprecated and will be removed in Werkzeug 3.3."
194 " It will always be 'ImmutableMultiDict'.",
195 DeprecationWarning,
196 stacklevel=2,
197 )
199 self.cls: type[ImmutableMultiDict[str, t.Any]] | None = kwargs.get("cls")
200 self.silent = silent
202 def parse_from_environ(self, environ: WSGIEnvironment) -> t_parse_result:
203 """Parses the information from the environment as form data.
205 :param environ: the WSGI environment to be used for parsing.
206 :return: A tuple in the form ``(stream, form, files)``.
207 """
208 stream = get_input_stream(environ, max_content_length=self.max_content_length)
209 content_length = get_content_length(environ)
210 mimetype, options = parse_options_header(environ.get("CONTENT_TYPE"))
211 return self.parse(
212 stream,
213 content_length=content_length,
214 mimetype=mimetype,
215 options=options,
216 )
218 def parse(
219 self,
220 stream: t.IO[bytes],
221 mimetype: str,
222 content_length: int | None,
223 options: dict[str, str] | None = None,
224 ) -> t_parse_result:
225 """Parses the information from the given stream, mimetype,
226 content length and mimetype parameters.
228 :param stream: an input stream
229 :param mimetype: the mimetype of the data
230 :param content_length: the content length of the incoming data
231 :param options: optional mimetype parameters (used for
232 the multipart boundary for instance)
233 :return: A tuple in the form ``(stream, form, files)``.
235 .. versionchanged:: 3.0
236 The invalid ``application/x-url-encoded`` content type is not
237 treated as ``application/x-www-form-urlencoded``.
238 """
239 if mimetype == "multipart/form-data":
240 parse_func = self._parse_multipart
241 elif mimetype == "application/x-www-form-urlencoded":
242 parse_func = self._parse_urlencoded
243 else:
244 if self.cls is not None:
245 return stream, self.cls(), self.cls()
247 return stream, ImmutableMultiDict(), ImmutableMultiDict()
249 if options is None:
250 options = {}
252 try:
253 return parse_func(stream, mimetype, content_length, options)
254 except ValueError:
255 if not self.silent:
256 raise
258 if self.cls is not None:
259 return stream, self.cls(), self.cls()
261 return stream, ImmutableMultiDict(), ImmutableMultiDict()
263 def _parse_multipart(
264 self,
265 stream: t.IO[bytes],
266 mimetype: str,
267 content_length: int | None,
268 options: dict[str, str],
269 ) -> t_parse_result:
270 boundary = options.get("boundary", "").encode("ascii")
272 if not boundary:
273 raise ValueError("Missing boundary")
275 kwargs: dict[str, t.Any] = dict(
276 stream_factory=self.stream_factory,
277 max_form_memory_size=self.max_form_memory_size,
278 max_form_parts=self.max_form_parts,
279 )
281 if self.cls is not None:
282 kwargs["cls"] = self.cls
284 with MultiPartParser(**kwargs) as parser:
285 form, files = parser.parse(stream, boundary, content_length)
287 return stream, form, files
289 def _parse_urlencoded(
290 self,
291 stream: t.IO[bytes],
292 mimetype: str,
293 content_length: int | None,
294 options: dict[str, str],
295 ) -> t_parse_result:
296 if (
297 self.max_form_memory_size is not None
298 and content_length is not None
299 and content_length > self.max_form_memory_size
300 ):
301 raise RequestEntityTooLarge()
303 items = parse_qsl(
304 stream.read().decode(),
305 keep_blank_values=True,
306 errors="werkzeug.url_quote",
307 )
309 if self.cls is not None:
310 return stream, self.cls(items), self.cls()
312 return stream, ImmutableMultiDict(items), ImmutableMultiDict()
315class MultiPartParser:
316 def __init__(
317 self,
318 stream_factory: TStreamFactory | None = None,
319 max_form_memory_size: int | None = None,
320 buffer_size: int = 64 * 1024,
321 max_form_parts: int | None = None,
322 **kwargs: t.Any,
323 ) -> None:
324 self.max_form_memory_size = max_form_memory_size
325 self.max_form_parts = max_form_parts
327 if stream_factory is None:
328 stream_factory = default_stream_factory
330 self.stream_factory = stream_factory
331 self._files: list[t.IO[bytes]] = []
333 if "cls" in kwargs:
334 import warnings
336 warnings.warn(
337 "The 'cls' parameter is deprecated and will be removed in Werkzeug 3.3."
338 " It will always be 'ImmutableMultiDict'.",
339 DeprecationWarning,
340 stacklevel=2,
341 )
343 self.cls: type[ImmutableMultiDict[str, t.Any]] | None = kwargs.get("cls")
344 self.buffer_size = buffer_size
346 def __enter__(self) -> te.Self:
347 return self
349 def __exit__(
350 self,
351 exc_type: type[BaseException] | None,
352 exc_val: BaseException | None,
353 exc_tb: TracebackType | None,
354 ) -> None:
355 if exc_val is not None:
356 for file in self._files:
357 file.close()
359 def get_part_charset(self, headers: Headers) -> str:
360 # Figure out input charset for current part
361 content_type = headers.get("Content-Type")
363 if content_type:
364 parameters = parse_options_header(content_type)[1]
365 ct_charset = parameters.get("charset", "").lower()
367 # A safe list of encodings. Modern clients should only send ASCII or UTF-8.
368 # This list will not be extended further.
369 if ct_charset in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}:
370 return ct_charset
372 return "utf-8"
374 def start_file_streaming(
375 self, event: File, total_content_length: int | None
376 ) -> t.IO[bytes]:
377 content_type = event.headers.get("Content-Type")
379 try:
380 content_length = _plain_int(event.headers["Content-Length"])
381 except (KeyError, ValueError):
382 content_length = 0
384 container = self.stream_factory(
385 total_content_length=total_content_length,
386 filename=event.filename,
387 content_type=content_type,
388 content_length=content_length,
389 )
390 self._files.append(container)
391 return container
393 def parse(
394 self, stream: t.IO[bytes], boundary: bytes, content_length: int | None
395 ) -> tuple[MultiDict[str, str], MultiDict[str, FileStorage]]:
396 current_part: Field | File
397 field_size: int | None = None
398 container: t.IO[bytes] | list[bytes]
399 _write: t.Callable[[bytes], t.Any]
401 parser = MultipartDecoder(
402 boundary,
403 max_form_memory_size=self.max_form_memory_size,
404 max_parts=self.max_form_parts,
405 )
407 fields = []
408 files = []
410 for data in _chunk_iter(stream.read, self.buffer_size):
411 parser.receive_data(data)
412 event = parser.next_event()
413 while not isinstance(event, (Epilogue, NeedData)):
414 if isinstance(event, Field):
415 current_part = event
416 field_size = 0
417 container = []
418 _write = container.append
419 elif isinstance(event, File):
420 current_part = event
421 field_size = None
422 container = self.start_file_streaming(event, content_length)
423 _write = container.write
424 elif isinstance(event, Data):
425 if self.max_form_memory_size is not None and field_size is not None:
426 # Ensure that accumulated data events do not exceed limit.
427 # Also checked within single event in MultipartDecoder.
428 field_size += len(event.data)
430 if field_size > self.max_form_memory_size:
431 raise RequestEntityTooLarge()
433 _write(event.data)
434 if not event.more_data:
435 if isinstance(current_part, Field):
436 value = b"".join(container).decode(
437 self.get_part_charset(current_part.headers), "replace"
438 )
439 fields.append((current_part.name, value))
440 else:
441 container = t.cast(t.IO[bytes], container)
442 container.seek(0)
443 files.append(
444 (
445 current_part.name,
446 FileStorage(
447 container,
448 current_part.filename,
449 current_part.name,
450 headers=current_part.headers,
451 ),
452 )
453 )
455 event = parser.next_event()
457 if self.cls is not None:
458 return self.cls(fields), self.cls(files)
460 return ImmutableMultiDict(fields), ImmutableMultiDict(files)
463def _chunk_iter(read: t.Callable[[int], bytes], size: int) -> t.Iterator[bytes | None]:
464 """Read data in chunks for multipart/form-data parsing. Stop if no data is read.
465 Yield ``None`` at the end to signal end of parsing.
466 """
467 while True:
468 data = read(size)
470 if not data:
471 break
473 yield data
475 yield None