Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/formparser.py: 21%
187 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-09 06:08 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-09 06:08 +0000
1from __future__ import annotations
3import typing as t
4import warnings
5from io import BytesIO
6from urllib.parse import parse_qsl
8from ._internal import _plain_int
9from .datastructures import FileStorage
10from .datastructures import Headers
11from .datastructures import MultiDict
12from .exceptions import RequestEntityTooLarge
13from .http import parse_options_header
14from .sansio.multipart import Data
15from .sansio.multipart import Epilogue
16from .sansio.multipart import Field
17from .sansio.multipart import File
18from .sansio.multipart import MultipartDecoder
19from .sansio.multipart import NeedData
20from .wsgi import get_content_length
21from .wsgi import get_input_stream
23# there are some platforms where SpooledTemporaryFile is not available.
24# In that case we need to provide a fallback.
25try:
26 from tempfile import SpooledTemporaryFile
27except ImportError:
28 from tempfile import TemporaryFile
30 SpooledTemporaryFile = None # type: ignore
32if t.TYPE_CHECKING:
33 import typing as te
34 from _typeshed.wsgi import WSGIEnvironment
36 t_parse_result = t.Tuple[t.IO[bytes], MultiDict, MultiDict]
38 class TStreamFactory(te.Protocol):
39 def __call__(
40 self,
41 total_content_length: int | None,
42 content_type: str | None,
43 filename: str | None,
44 content_length: int | None = None,
45 ) -> t.IO[bytes]:
46 ...
49F = t.TypeVar("F", bound=t.Callable[..., t.Any])
52def default_stream_factory(
53 total_content_length: int | None,
54 content_type: str | None,
55 filename: str | None,
56 content_length: int | None = None,
57) -> t.IO[bytes]:
58 max_size = 1024 * 500
60 if SpooledTemporaryFile is not None:
61 return t.cast(t.IO[bytes], SpooledTemporaryFile(max_size=max_size, mode="rb+"))
62 elif total_content_length is None or total_content_length > max_size:
63 return t.cast(t.IO[bytes], TemporaryFile("rb+"))
65 return BytesIO()
68def parse_form_data(
69 environ: WSGIEnvironment,
70 stream_factory: TStreamFactory | None = None,
71 charset: str | None = None,
72 errors: str | None = None,
73 max_form_memory_size: int | None = None,
74 max_content_length: int | None = None,
75 cls: type[MultiDict] | None = None,
76 silent: bool = True,
77 *,
78 max_form_parts: int | None = None,
79) -> t_parse_result:
80 """Parse the form data in the environ and return it as tuple in the form
81 ``(stream, form, files)``. You should only call this method if the
82 transport method is `POST`, `PUT`, or `PATCH`.
84 If the mimetype of the data transmitted is `multipart/form-data` the
85 files multidict will be filled with `FileStorage` objects. If the
86 mimetype is unknown the input stream is wrapped and returned as first
87 argument, else the stream is empty.
89 This is a shortcut for the common usage of :class:`FormDataParser`.
91 :param environ: the WSGI environment to be used for parsing.
92 :param stream_factory: An optional callable that returns a new read and
93 writeable file descriptor. This callable works
94 the same as :meth:`Response._get_file_stream`.
95 :param max_form_memory_size: the maximum number of bytes to be accepted for
96 in-memory stored form data. If the data
97 exceeds the value specified an
98 :exc:`~exceptions.RequestEntityTooLarge`
99 exception is raised.
100 :param max_content_length: If this is provided and the transmitted data
101 is longer than this value an
102 :exc:`~exceptions.RequestEntityTooLarge`
103 exception is raised.
104 :param cls: an optional dict class to use. If this is not specified
105 or `None` the default :class:`MultiDict` is used.
106 :param silent: If set to False parsing errors will not be caught.
107 :param max_form_parts: The maximum number of multipart parts to be parsed. If this
108 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised.
109 :return: A tuple in the form ``(stream, form, files)``.
111 .. versionchanged:: 2.3
112 Added the ``max_form_parts`` parameter.
114 .. versionchanged:: 2.3
115 The ``charset`` and ``errors`` parameters are deprecated and will be removed in
116 Werkzeug 3.0.
118 .. versionadded:: 0.5.1
119 Added the ``silent`` parameter.
121 .. versionadded:: 0.5
122 Added the ``max_form_memory_size``, ``max_content_length``, and ``cls``
123 parameters.
124 """
125 return FormDataParser(
126 stream_factory=stream_factory,
127 charset=charset,
128 errors=errors,
129 max_form_memory_size=max_form_memory_size,
130 max_content_length=max_content_length,
131 max_form_parts=max_form_parts,
132 silent=silent,
133 cls=cls,
134 ).parse_from_environ(environ)
137class FormDataParser:
138 """This class implements parsing of form data for Werkzeug. By itself
139 it can parse multipart and url encoded form data. It can be subclassed
140 and extended but for most mimetypes it is a better idea to use the
141 untouched stream and expose it as separate attributes on a request
142 object.
144 :param stream_factory: An optional callable that returns a new read and
145 writeable file descriptor. This callable works
146 the same as :meth:`Response._get_file_stream`.
147 :param max_form_memory_size: the maximum number of bytes to be accepted for
148 in-memory stored form data. If the data
149 exceeds the value specified an
150 :exc:`~exceptions.RequestEntityTooLarge`
151 exception is raised.
152 :param max_content_length: If this is provided and the transmitted data
153 is longer than this value an
154 :exc:`~exceptions.RequestEntityTooLarge`
155 exception is raised.
156 :param cls: an optional dict class to use. If this is not specified
157 or `None` the default :class:`MultiDict` is used.
158 :param silent: If set to False parsing errors will not be caught.
159 :param max_form_parts: The maximum number of multipart parts to be parsed. If this
160 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised.
162 .. versionchanged:: 2.3
163 The ``charset`` and ``errors`` parameters are deprecated and will be removed in
164 Werkzeug 3.0.
166 .. versionchanged:: 2.3
167 The ``parse_functions`` attribute and ``get_parse_func`` methods are deprecated
168 and will be removed in Werkzeug 3.0.
170 .. versionchanged:: 2.2.3
171 Added the ``max_form_parts`` parameter.
173 .. versionadded:: 0.8
174 """
176 def __init__(
177 self,
178 stream_factory: TStreamFactory | None = None,
179 charset: str | None = None,
180 errors: str | None = None,
181 max_form_memory_size: int | None = None,
182 max_content_length: int | None = None,
183 cls: type[MultiDict] | None = None,
184 silent: bool = True,
185 *,
186 max_form_parts: int | None = None,
187 ) -> None:
188 if stream_factory is None:
189 stream_factory = default_stream_factory
191 self.stream_factory = stream_factory
193 if charset is not None:
194 warnings.warn(
195 "The 'charset' parameter is deprecated and will be"
196 " removed in Werkzeug 3.0.",
197 DeprecationWarning,
198 stacklevel=2,
199 )
200 else:
201 charset = "utf-8"
203 self.charset = charset
205 if errors is not None:
206 warnings.warn(
207 "The 'errors' parameter is deprecated and will be"
208 " removed in Werkzeug 3.0.",
209 DeprecationWarning,
210 stacklevel=2,
211 )
212 else:
213 errors = "replace"
215 self.errors = errors
216 self.max_form_memory_size = max_form_memory_size
217 self.max_content_length = max_content_length
218 self.max_form_parts = max_form_parts
220 if cls is None:
221 cls = MultiDict
223 self.cls = cls
224 self.silent = silent
226 def get_parse_func(
227 self, mimetype: str, options: dict[str, str]
228 ) -> None | (
229 t.Callable[
230 [FormDataParser, t.IO[bytes], str, int | None, dict[str, str]],
231 t_parse_result,
232 ]
233 ):
234 warnings.warn(
235 "The 'get_parse_func' method is deprecated and will be"
236 " removed in Werkzeug 3.0.",
237 DeprecationWarning,
238 stacklevel=2,
239 )
241 if mimetype == "multipart/form-data":
242 return type(self)._parse_multipart
243 elif mimetype == "application/x-www-form-urlencoded":
244 return type(self)._parse_urlencoded
245 elif mimetype == "application/x-url-encoded":
246 warnings.warn(
247 "The 'application/x-url-encoded' mimetype is invalid, and will not be"
248 " treated as 'application/x-www-form-urlencoded' in Werkzeug 3.0.",
249 DeprecationWarning,
250 stacklevel=2,
251 )
252 return type(self)._parse_urlencoded
253 elif mimetype in self.parse_functions:
254 warnings.warn(
255 "The 'parse_functions' attribute is deprecated and will be removed in"
256 " Werkzeug 3.0. Override 'parse' instead.",
257 DeprecationWarning,
258 stacklevel=2,
259 )
260 return self.parse_functions[mimetype]
262 return None
264 def parse_from_environ(self, environ: WSGIEnvironment) -> t_parse_result:
265 """Parses the information from the environment as form data.
267 :param environ: the WSGI environment to be used for parsing.
268 :return: A tuple in the form ``(stream, form, files)``.
269 """
270 stream = get_input_stream(environ, max_content_length=self.max_content_length)
271 content_length = get_content_length(environ)
272 mimetype, options = parse_options_header(environ.get("CONTENT_TYPE"))
273 return self.parse(
274 stream,
275 content_length=content_length,
276 mimetype=mimetype,
277 options=options,
278 )
280 def parse(
281 self,
282 stream: t.IO[bytes],
283 mimetype: str,
284 content_length: int | None,
285 options: dict[str, str] | None = None,
286 ) -> t_parse_result:
287 """Parses the information from the given stream, mimetype,
288 content length and mimetype parameters.
290 :param stream: an input stream
291 :param mimetype: the mimetype of the data
292 :param content_length: the content length of the incoming data
293 :param options: optional mimetype parameters (used for
294 the multipart boundary for instance)
295 :return: A tuple in the form ``(stream, form, files)``.
297 .. versionchanged:: 2.3
298 The ``application/x-url-encoded`` content type is deprecated and will not be
299 treated as ``application/x-www-form-urlencoded`` in Werkzeug 3.0.
300 """
301 if mimetype == "multipart/form-data":
302 parse_func = self._parse_multipart
303 elif mimetype == "application/x-www-form-urlencoded":
304 parse_func = self._parse_urlencoded
305 elif mimetype == "application/x-url-encoded":
306 warnings.warn(
307 "The 'application/x-url-encoded' mimetype is invalid, and will not be"
308 " treated as 'application/x-www-form-urlencoded' in Werkzeug 3.0.",
309 DeprecationWarning,
310 stacklevel=2,
311 )
312 parse_func = self._parse_urlencoded
313 elif mimetype in self.parse_functions:
314 warnings.warn(
315 "The 'parse_functions' attribute is deprecated and will be removed in"
316 " Werkzeug 3.0. Override 'parse' instead.",
317 DeprecationWarning,
318 stacklevel=2,
319 )
320 parse_func = self.parse_functions[mimetype].__get__(self, type(self))
321 else:
322 return stream, self.cls(), self.cls()
324 if options is None:
325 options = {}
327 try:
328 return parse_func(stream, mimetype, content_length, options)
329 except ValueError:
330 if not self.silent:
331 raise
333 return stream, self.cls(), self.cls()
335 def _parse_multipart(
336 self,
337 stream: t.IO[bytes],
338 mimetype: str,
339 content_length: int | None,
340 options: dict[str, str],
341 ) -> t_parse_result:
342 charset = self.charset if self.charset != "utf-8" else None
343 errors = self.errors if self.errors != "replace" else None
344 parser = MultiPartParser(
345 stream_factory=self.stream_factory,
346 charset=charset,
347 errors=errors,
348 max_form_memory_size=self.max_form_memory_size,
349 max_form_parts=self.max_form_parts,
350 cls=self.cls,
351 )
352 boundary = options.get("boundary", "").encode("ascii")
354 if not boundary:
355 raise ValueError("Missing boundary")
357 form, files = parser.parse(stream, boundary, content_length)
358 return stream, form, files
360 def _parse_urlencoded(
361 self,
362 stream: t.IO[bytes],
363 mimetype: str,
364 content_length: int | None,
365 options: dict[str, str],
366 ) -> t_parse_result:
367 if (
368 self.max_form_memory_size is not None
369 and content_length is not None
370 and content_length > self.max_form_memory_size
371 ):
372 raise RequestEntityTooLarge()
374 try:
375 items = parse_qsl(
376 stream.read().decode(),
377 keep_blank_values=True,
378 encoding=self.charset,
379 errors="werkzeug.url_quote",
380 )
381 except ValueError as e:
382 raise RequestEntityTooLarge() from e
384 return stream, self.cls(items), self.cls()
386 parse_functions: dict[
387 str,
388 t.Callable[
389 [FormDataParser, t.IO[bytes], str, int | None, dict[str, str]],
390 t_parse_result,
391 ],
392 ] = {}
395class MultiPartParser:
396 def __init__(
397 self,
398 stream_factory: TStreamFactory | None = None,
399 charset: str | None = None,
400 errors: str | None = None,
401 max_form_memory_size: int | None = None,
402 cls: type[MultiDict] | None = None,
403 buffer_size: int = 64 * 1024,
404 max_form_parts: int | None = None,
405 ) -> None:
406 if charset is not None:
407 warnings.warn(
408 "The 'charset' parameter is deprecated and will be"
409 " removed in Werkzeug 3.0.",
410 DeprecationWarning,
411 stacklevel=2,
412 )
413 else:
414 charset = "utf-8"
416 self.charset = charset
418 if errors is not None:
419 warnings.warn(
420 "The 'errors' parameter is deprecated and will be"
421 " removed in Werkzeug 3.0.",
422 DeprecationWarning,
423 stacklevel=2,
424 )
425 else:
426 errors = "replace"
428 self.errors = errors
429 self.max_form_memory_size = max_form_memory_size
430 self.max_form_parts = max_form_parts
432 if stream_factory is None:
433 stream_factory = default_stream_factory
435 self.stream_factory = stream_factory
437 if cls is None:
438 cls = MultiDict
440 self.cls = cls
441 self.buffer_size = buffer_size
443 def fail(self, message: str) -> te.NoReturn:
444 raise ValueError(message)
446 def get_part_charset(self, headers: Headers) -> str:
447 # Figure out input charset for current part
448 content_type = headers.get("content-type")
450 if content_type:
451 parameters = parse_options_header(content_type)[1]
452 ct_charset = parameters.get("charset", "").lower()
454 # A safe list of encodings. Modern clients should only send ASCII or UTF-8.
455 # This list will not be extended further.
456 if ct_charset in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}:
457 return ct_charset
459 return self.charset
461 def start_file_streaming(
462 self, event: File, total_content_length: int | None
463 ) -> t.IO[bytes]:
464 content_type = event.headers.get("content-type")
466 try:
467 content_length = _plain_int(event.headers["content-length"])
468 except (KeyError, ValueError):
469 content_length = 0
471 container = self.stream_factory(
472 total_content_length=total_content_length,
473 filename=event.filename,
474 content_type=content_type,
475 content_length=content_length,
476 )
477 return container
479 def parse(
480 self, stream: t.IO[bytes], boundary: bytes, content_length: int | None
481 ) -> tuple[MultiDict, MultiDict]:
482 current_part: Field | File
483 container: t.IO[bytes] | list[bytes]
484 _write: t.Callable[[bytes], t.Any]
486 parser = MultipartDecoder(
487 boundary,
488 max_form_memory_size=self.max_form_memory_size,
489 max_parts=self.max_form_parts,
490 )
492 fields = []
493 files = []
495 for data in _chunk_iter(stream.read, self.buffer_size):
496 parser.receive_data(data)
497 event = parser.next_event()
498 while not isinstance(event, (Epilogue, NeedData)):
499 if isinstance(event, Field):
500 current_part = event
501 container = []
502 _write = container.append
503 elif isinstance(event, File):
504 current_part = event
505 container = self.start_file_streaming(event, content_length)
506 _write = container.write
507 elif isinstance(event, Data):
508 _write(event.data)
509 if not event.more_data:
510 if isinstance(current_part, Field):
511 value = b"".join(container).decode(
512 self.get_part_charset(current_part.headers), self.errors
513 )
514 fields.append((current_part.name, value))
515 else:
516 container = t.cast(t.IO[bytes], container)
517 container.seek(0)
518 files.append(
519 (
520 current_part.name,
521 FileStorage(
522 container,
523 current_part.filename,
524 current_part.name,
525 headers=current_part.headers,
526 ),
527 )
528 )
530 event = parser.next_event()
532 return self.cls(fields), self.cls(files)
535def _chunk_iter(read: t.Callable[[int], bytes], size: int) -> t.Iterator[bytes | None]:
536 """Read data in chunks for multipart/form-data parsing. Stop if no data is read.
537 Yield ``None`` at the end to signal end of parsing.
538 """
539 while True:
540 data = read(size)
542 if not data:
543 break
545 yield data
547 yield None