Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/formparser.py: 25%
148 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 07:17 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 07:17 +0000
1from __future__ import annotations
3import typing as t
4from io import BytesIO
5from urllib.parse import parse_qsl
7from ._internal import _plain_int
8from .datastructures import FileStorage
9from .datastructures import Headers
10from .datastructures import MultiDict
11from .exceptions import RequestEntityTooLarge
12from .http import parse_options_header
13from .sansio.multipart import Data
14from .sansio.multipart import Epilogue
15from .sansio.multipart import Field
16from .sansio.multipart import File
17from .sansio.multipart import MultipartDecoder
18from .sansio.multipart import NeedData
19from .wsgi import get_content_length
20from .wsgi import get_input_stream
22# there are some platforms where SpooledTemporaryFile is not available.
23# In that case we need to provide a fallback.
24try:
25 from tempfile import SpooledTemporaryFile
26except ImportError:
27 from tempfile import TemporaryFile
29 SpooledTemporaryFile = None # type: ignore
31if t.TYPE_CHECKING:
32 import typing as te
33 from _typeshed.wsgi import WSGIEnvironment
35 t_parse_result = t.Tuple[t.IO[bytes], MultiDict, MultiDict]
37 class TStreamFactory(te.Protocol):
38 def __call__(
39 self,
40 total_content_length: int | None,
41 content_type: str | None,
42 filename: str | None,
43 content_length: int | None = None,
44 ) -> t.IO[bytes]:
45 ...
48F = t.TypeVar("F", bound=t.Callable[..., t.Any])
51def default_stream_factory(
52 total_content_length: int | None,
53 content_type: str | None,
54 filename: str | None,
55 content_length: int | None = None,
56) -> t.IO[bytes]:
57 max_size = 1024 * 500
59 if SpooledTemporaryFile is not None:
60 return t.cast(t.IO[bytes], SpooledTemporaryFile(max_size=max_size, mode="rb+"))
61 elif total_content_length is None or total_content_length > max_size:
62 return t.cast(t.IO[bytes], TemporaryFile("rb+"))
64 return BytesIO()
67def parse_form_data(
68 environ: WSGIEnvironment,
69 stream_factory: TStreamFactory | None = None,
70 max_form_memory_size: int | None = None,
71 max_content_length: int | None = None,
72 cls: type[MultiDict] | None = None,
73 silent: bool = True,
74 *,
75 max_form_parts: int | None = None,
76) -> t_parse_result:
77 """Parse the form data in the environ and return it as tuple in the form
78 ``(stream, form, files)``. You should only call this method if the
79 transport method is `POST`, `PUT`, or `PATCH`.
81 If the mimetype of the data transmitted is `multipart/form-data` the
82 files multidict will be filled with `FileStorage` objects. If the
83 mimetype is unknown the input stream is wrapped and returned as first
84 argument, else the stream is empty.
86 This is a shortcut for the common usage of :class:`FormDataParser`.
88 :param environ: the WSGI environment to be used for parsing.
89 :param stream_factory: An optional callable that returns a new read and
90 writeable file descriptor. This callable works
91 the same as :meth:`Response._get_file_stream`.
92 :param max_form_memory_size: the maximum number of bytes to be accepted for
93 in-memory stored form data. If the data
94 exceeds the value specified an
95 :exc:`~exceptions.RequestEntityTooLarge`
96 exception is raised.
97 :param max_content_length: If this is provided and the transmitted data
98 is longer than this value an
99 :exc:`~exceptions.RequestEntityTooLarge`
100 exception is raised.
101 :param cls: an optional dict class to use. If this is not specified
102 or `None` the default :class:`MultiDict` is used.
103 :param silent: If set to False parsing errors will not be caught.
104 :param max_form_parts: The maximum number of multipart parts to be parsed. If this
105 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised.
106 :return: A tuple in the form ``(stream, form, files)``.
108 .. versionchanged:: 3.0
109 The ``charset`` and ``errors`` parameters were removed.
111 .. versionchanged:: 2.3
112 Added the ``max_form_parts`` parameter.
114 .. versionadded:: 0.5.1
115 Added the ``silent`` parameter.
117 .. versionadded:: 0.5
118 Added the ``max_form_memory_size``, ``max_content_length``, and ``cls``
119 parameters.
120 """
121 return FormDataParser(
122 stream_factory=stream_factory,
123 max_form_memory_size=max_form_memory_size,
124 max_content_length=max_content_length,
125 max_form_parts=max_form_parts,
126 silent=silent,
127 cls=cls,
128 ).parse_from_environ(environ)
131class FormDataParser:
132 """This class implements parsing of form data for Werkzeug. By itself
133 it can parse multipart and url encoded form data. It can be subclassed
134 and extended but for most mimetypes it is a better idea to use the
135 untouched stream and expose it as separate attributes on a request
136 object.
138 :param stream_factory: An optional callable that returns a new read and
139 writeable file descriptor. This callable works
140 the same as :meth:`Response._get_file_stream`.
141 :param max_form_memory_size: the maximum number of bytes to be accepted for
142 in-memory stored form data. If the data
143 exceeds the value specified an
144 :exc:`~exceptions.RequestEntityTooLarge`
145 exception is raised.
146 :param max_content_length: If this is provided and the transmitted data
147 is longer than this value an
148 :exc:`~exceptions.RequestEntityTooLarge`
149 exception is raised.
150 :param cls: an optional dict class to use. If this is not specified
151 or `None` the default :class:`MultiDict` is used.
152 :param silent: If set to False parsing errors will not be caught.
153 :param max_form_parts: The maximum number of multipart parts to be parsed. If this
154 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised.
156 .. versionchanged:: 3.0
157 The ``charset`` and ``errors`` parameters were removed.
159 .. versionchanged:: 3.0
160 The ``parse_functions`` attribute and ``get_parse_func`` methods were removed.
162 .. versionchanged:: 2.2.3
163 Added the ``max_form_parts`` parameter.
165 .. versionadded:: 0.8
166 """
168 def __init__(
169 self,
170 stream_factory: TStreamFactory | None = None,
171 max_form_memory_size: int | None = None,
172 max_content_length: int | None = None,
173 cls: type[MultiDict] | None = None,
174 silent: bool = True,
175 *,
176 max_form_parts: int | None = None,
177 ) -> None:
178 if stream_factory is None:
179 stream_factory = default_stream_factory
181 self.stream_factory = stream_factory
182 self.max_form_memory_size = max_form_memory_size
183 self.max_content_length = max_content_length
184 self.max_form_parts = max_form_parts
186 if cls is None:
187 cls = MultiDict
189 self.cls = cls
190 self.silent = silent
192 def parse_from_environ(self, environ: WSGIEnvironment) -> t_parse_result:
193 """Parses the information from the environment as form data.
195 :param environ: the WSGI environment to be used for parsing.
196 :return: A tuple in the form ``(stream, form, files)``.
197 """
198 stream = get_input_stream(environ, max_content_length=self.max_content_length)
199 content_length = get_content_length(environ)
200 mimetype, options = parse_options_header(environ.get("CONTENT_TYPE"))
201 return self.parse(
202 stream,
203 content_length=content_length,
204 mimetype=mimetype,
205 options=options,
206 )
208 def parse(
209 self,
210 stream: t.IO[bytes],
211 mimetype: str,
212 content_length: int | None,
213 options: dict[str, str] | None = None,
214 ) -> t_parse_result:
215 """Parses the information from the given stream, mimetype,
216 content length and mimetype parameters.
218 :param stream: an input stream
219 :param mimetype: the mimetype of the data
220 :param content_length: the content length of the incoming data
221 :param options: optional mimetype parameters (used for
222 the multipart boundary for instance)
223 :return: A tuple in the form ``(stream, form, files)``.
225 .. versionchanged:: 3.0
226 The invalid ``application/x-url-encoded`` content type is not
227 treated as ``application/x-www-form-urlencoded``.
228 """
229 if mimetype == "multipart/form-data":
230 parse_func = self._parse_multipart
231 elif mimetype == "application/x-www-form-urlencoded":
232 parse_func = self._parse_urlencoded
233 else:
234 return stream, self.cls(), self.cls()
236 if options is None:
237 options = {}
239 try:
240 return parse_func(stream, mimetype, content_length, options)
241 except ValueError:
242 if not self.silent:
243 raise
245 return stream, self.cls(), self.cls()
247 def _parse_multipart(
248 self,
249 stream: t.IO[bytes],
250 mimetype: str,
251 content_length: int | None,
252 options: dict[str, str],
253 ) -> t_parse_result:
254 parser = MultiPartParser(
255 stream_factory=self.stream_factory,
256 max_form_memory_size=self.max_form_memory_size,
257 max_form_parts=self.max_form_parts,
258 cls=self.cls,
259 )
260 boundary = options.get("boundary", "").encode("ascii")
262 if not boundary:
263 raise ValueError("Missing boundary")
265 form, files = parser.parse(stream, boundary, content_length)
266 return stream, form, files
268 def _parse_urlencoded(
269 self,
270 stream: t.IO[bytes],
271 mimetype: str,
272 content_length: int | None,
273 options: dict[str, str],
274 ) -> t_parse_result:
275 if (
276 self.max_form_memory_size is not None
277 and content_length is not None
278 and content_length > self.max_form_memory_size
279 ):
280 raise RequestEntityTooLarge()
282 try:
283 items = parse_qsl(
284 stream.read().decode(),
285 keep_blank_values=True,
286 errors="werkzeug.url_quote",
287 )
288 except ValueError as e:
289 raise RequestEntityTooLarge() from e
291 return stream, self.cls(items), self.cls()
294class MultiPartParser:
295 def __init__(
296 self,
297 stream_factory: TStreamFactory | None = None,
298 max_form_memory_size: int | None = None,
299 cls: type[MultiDict] | None = None,
300 buffer_size: int = 64 * 1024,
301 max_form_parts: int | None = None,
302 ) -> None:
303 self.max_form_memory_size = max_form_memory_size
304 self.max_form_parts = max_form_parts
306 if stream_factory is None:
307 stream_factory = default_stream_factory
309 self.stream_factory = stream_factory
311 if cls is None:
312 cls = MultiDict
314 self.cls = cls
315 self.buffer_size = buffer_size
317 def fail(self, message: str) -> te.NoReturn:
318 raise ValueError(message)
320 def get_part_charset(self, headers: Headers) -> str:
321 # Figure out input charset for current part
322 content_type = headers.get("content-type")
324 if content_type:
325 parameters = parse_options_header(content_type)[1]
326 ct_charset = parameters.get("charset", "").lower()
328 # A safe list of encodings. Modern clients should only send ASCII or UTF-8.
329 # This list will not be extended further.
330 if ct_charset in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}:
331 return ct_charset
333 return "utf-8"
335 def start_file_streaming(
336 self, event: File, total_content_length: int | None
337 ) -> t.IO[bytes]:
338 content_type = event.headers.get("content-type")
340 try:
341 content_length = _plain_int(event.headers["content-length"])
342 except (KeyError, ValueError):
343 content_length = 0
345 container = self.stream_factory(
346 total_content_length=total_content_length,
347 filename=event.filename,
348 content_type=content_type,
349 content_length=content_length,
350 )
351 return container
353 def parse(
354 self, stream: t.IO[bytes], boundary: bytes, content_length: int | None
355 ) -> tuple[MultiDict, MultiDict]:
356 current_part: Field | File
357 container: t.IO[bytes] | list[bytes]
358 _write: t.Callable[[bytes], t.Any]
360 parser = MultipartDecoder(
361 boundary,
362 max_form_memory_size=self.max_form_memory_size,
363 max_parts=self.max_form_parts,
364 )
366 fields = []
367 files = []
369 for data in _chunk_iter(stream.read, self.buffer_size):
370 parser.receive_data(data)
371 event = parser.next_event()
372 while not isinstance(event, (Epilogue, NeedData)):
373 if isinstance(event, Field):
374 current_part = event
375 container = []
376 _write = container.append
377 elif isinstance(event, File):
378 current_part = event
379 container = self.start_file_streaming(event, content_length)
380 _write = container.write
381 elif isinstance(event, Data):
382 _write(event.data)
383 if not event.more_data:
384 if isinstance(current_part, Field):
385 value = b"".join(container).decode(
386 self.get_part_charset(current_part.headers), "replace"
387 )
388 fields.append((current_part.name, value))
389 else:
390 container = t.cast(t.IO[bytes], container)
391 container.seek(0)
392 files.append(
393 (
394 current_part.name,
395 FileStorage(
396 container,
397 current_part.filename,
398 current_part.name,
399 headers=current_part.headers,
400 ),
401 )
402 )
404 event = parser.next_event()
406 return self.cls(fields), self.cls(files)
409def _chunk_iter(read: t.Callable[[int], bytes], size: int) -> t.Iterator[bytes | None]:
410 """Read data in chunks for multipart/form-data parsing. Stop if no data is read.
411 Yield ``None`` at the end to signal end of parsing.
412 """
413 while True:
414 data = read(size)
416 if not data:
417 break
419 yield data
421 yield None