Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/formparser.py: 27%
171 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1import typing as t
2from functools import update_wrapper
3from io import BytesIO
4from itertools import chain
5from typing import Union
7from . import exceptions
8from .datastructures import FileStorage
9from .datastructures import Headers
10from .datastructures import MultiDict
11from .http import parse_options_header
12from .sansio.multipart import Data
13from .sansio.multipart import Epilogue
14from .sansio.multipart import Field
15from .sansio.multipart import File
16from .sansio.multipart import MultipartDecoder
17from .sansio.multipart import NeedData
18from .urls import url_decode_stream
19from .wsgi import _make_chunk_iter
20from .wsgi import get_content_length
21from .wsgi import get_input_stream
23# there are some platforms where SpooledTemporaryFile is not available.
24# In that case we need to provide a fallback.
25try:
26 from tempfile import SpooledTemporaryFile
27except ImportError:
28 from tempfile import TemporaryFile
30 SpooledTemporaryFile = None # type: ignore
32if t.TYPE_CHECKING:
33 import typing as te
34 from _typeshed.wsgi import WSGIEnvironment
36 t_parse_result = t.Tuple[t.IO[bytes], MultiDict, MultiDict]
38 class TStreamFactory(te.Protocol):
39 def __call__(
40 self,
41 total_content_length: t.Optional[int],
42 content_type: t.Optional[str],
43 filename: t.Optional[str],
44 content_length: t.Optional[int] = None,
45 ) -> t.IO[bytes]:
46 ...
49F = t.TypeVar("F", bound=t.Callable[..., t.Any])
52def _exhaust(stream: t.IO[bytes]) -> None:
53 bts = stream.read(64 * 1024)
54 while bts:
55 bts = stream.read(64 * 1024)
58def default_stream_factory(
59 total_content_length: t.Optional[int],
60 content_type: t.Optional[str],
61 filename: t.Optional[str],
62 content_length: t.Optional[int] = None,
63) -> t.IO[bytes]:
64 max_size = 1024 * 500
66 if SpooledTemporaryFile is not None:
67 return t.cast(t.IO[bytes], SpooledTemporaryFile(max_size=max_size, mode="rb+"))
68 elif total_content_length is None or total_content_length > max_size:
69 return t.cast(t.IO[bytes], TemporaryFile("rb+"))
71 return BytesIO()
74def parse_form_data(
75 environ: "WSGIEnvironment",
76 stream_factory: t.Optional["TStreamFactory"] = None,
77 charset: str = "utf-8",
78 errors: str = "replace",
79 max_form_memory_size: t.Optional[int] = None,
80 max_content_length: t.Optional[int] = None,
81 cls: t.Optional[t.Type[MultiDict]] = None,
82 silent: bool = True,
83) -> "t_parse_result":
84 """Parse the form data in the environ and return it as tuple in the form
85 ``(stream, form, files)``. You should only call this method if the
86 transport method is `POST`, `PUT`, or `PATCH`.
88 If the mimetype of the data transmitted is `multipart/form-data` the
89 files multidict will be filled with `FileStorage` objects. If the
90 mimetype is unknown the input stream is wrapped and returned as first
91 argument, else the stream is empty.
93 This is a shortcut for the common usage of :class:`FormDataParser`.
95 Have a look at :doc:`/request_data` for more details.
97 .. versionadded:: 0.5
98 The `max_form_memory_size`, `max_content_length` and
99 `cls` parameters were added.
101 .. versionadded:: 0.5.1
102 The optional `silent` flag was added.
104 :param environ: the WSGI environment to be used for parsing.
105 :param stream_factory: An optional callable that returns a new read and
106 writeable file descriptor. This callable works
107 the same as :meth:`Response._get_file_stream`.
108 :param charset: The character set for URL and url encoded form data.
109 :param errors: The encoding error behavior.
110 :param max_form_memory_size: the maximum number of bytes to be accepted for
111 in-memory stored form data. If the data
112 exceeds the value specified an
113 :exc:`~exceptions.RequestEntityTooLarge`
114 exception is raised.
115 :param max_content_length: If this is provided and the transmitted data
116 is longer than this value an
117 :exc:`~exceptions.RequestEntityTooLarge`
118 exception is raised.
119 :param cls: an optional dict class to use. If this is not specified
120 or `None` the default :class:`MultiDict` is used.
121 :param silent: If set to False parsing errors will not be caught.
122 :return: A tuple in the form ``(stream, form, files)``.
123 """
124 return FormDataParser(
125 stream_factory,
126 charset,
127 errors,
128 max_form_memory_size,
129 max_content_length,
130 cls,
131 silent,
132 ).parse_from_environ(environ)
135def exhaust_stream(f: F) -> F:
136 """Helper decorator for methods that exhausts the stream on return."""
138 def wrapper(self, stream, *args, **kwargs): # type: ignore
139 try:
140 return f(self, stream, *args, **kwargs)
141 finally:
142 exhaust = getattr(stream, "exhaust", None)
144 if exhaust is not None:
145 exhaust()
146 else:
147 while True:
148 chunk = stream.read(1024 * 64)
150 if not chunk:
151 break
153 return update_wrapper(t.cast(F, wrapper), f)
156class FormDataParser:
157 """This class implements parsing of form data for Werkzeug. By itself
158 it can parse multipart and url encoded form data. It can be subclassed
159 and extended but for most mimetypes it is a better idea to use the
160 untouched stream and expose it as separate attributes on a request
161 object.
163 .. versionadded:: 0.8
165 :param stream_factory: An optional callable that returns a new read and
166 writeable file descriptor. This callable works
167 the same as :meth:`Response._get_file_stream`.
168 :param charset: The character set for URL and url encoded form data.
169 :param errors: The encoding error behavior.
170 :param max_form_memory_size: the maximum number of bytes to be accepted for
171 in-memory stored form data. If the data
172 exceeds the value specified an
173 :exc:`~exceptions.RequestEntityTooLarge`
174 exception is raised.
175 :param max_content_length: If this is provided and the transmitted data
176 is longer than this value an
177 :exc:`~exceptions.RequestEntityTooLarge`
178 exception is raised.
179 :param cls: an optional dict class to use. If this is not specified
180 or `None` the default :class:`MultiDict` is used.
181 :param silent: If set to False parsing errors will not be caught.
182 :param max_form_parts: The maximum number of parts to be parsed. If this is
183 exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised.
184 """
186 def __init__(
187 self,
188 stream_factory: t.Optional["TStreamFactory"] = None,
189 charset: str = "utf-8",
190 errors: str = "replace",
191 max_form_memory_size: t.Optional[int] = None,
192 max_content_length: t.Optional[int] = None,
193 cls: t.Optional[t.Type[MultiDict]] = None,
194 silent: bool = True,
195 *,
196 max_form_parts: t.Optional[int] = None,
197 ) -> None:
198 if stream_factory is None:
199 stream_factory = default_stream_factory
201 self.stream_factory = stream_factory
202 self.charset = charset
203 self.errors = errors
204 self.max_form_memory_size = max_form_memory_size
205 self.max_content_length = max_content_length
206 self.max_form_parts = max_form_parts
208 if cls is None:
209 cls = MultiDict
211 self.cls = cls
212 self.silent = silent
214 def get_parse_func(
215 self, mimetype: str, options: t.Dict[str, str]
216 ) -> t.Optional[
217 t.Callable[
218 ["FormDataParser", t.IO[bytes], str, t.Optional[int], t.Dict[str, str]],
219 "t_parse_result",
220 ]
221 ]:
222 return self.parse_functions.get(mimetype)
224 def parse_from_environ(self, environ: "WSGIEnvironment") -> "t_parse_result":
225 """Parses the information from the environment as form data.
227 :param environ: the WSGI environment to be used for parsing.
228 :return: A tuple in the form ``(stream, form, files)``.
229 """
230 content_type = environ.get("CONTENT_TYPE", "")
231 content_length = get_content_length(environ)
232 mimetype, options = parse_options_header(content_type)
233 return self.parse(get_input_stream(environ), mimetype, content_length, options)
235 def parse(
236 self,
237 stream: t.IO[bytes],
238 mimetype: str,
239 content_length: t.Optional[int],
240 options: t.Optional[t.Dict[str, str]] = None,
241 ) -> "t_parse_result":
242 """Parses the information from the given stream, mimetype,
243 content length and mimetype parameters.
245 :param stream: an input stream
246 :param mimetype: the mimetype of the data
247 :param content_length: the content length of the incoming data
248 :param options: optional mimetype parameters (used for
249 the multipart boundary for instance)
250 :return: A tuple in the form ``(stream, form, files)``.
251 """
252 if (
253 self.max_content_length is not None
254 and content_length is not None
255 and content_length > self.max_content_length
256 ):
257 # if the input stream is not exhausted, firefox reports Connection Reset
258 _exhaust(stream)
259 raise exceptions.RequestEntityTooLarge()
261 if options is None:
262 options = {}
264 parse_func = self.get_parse_func(mimetype, options)
266 if parse_func is not None:
267 try:
268 return parse_func(self, stream, mimetype, content_length, options)
269 except ValueError:
270 if not self.silent:
271 raise
273 return stream, self.cls(), self.cls()
275 @exhaust_stream
276 def _parse_multipart(
277 self,
278 stream: t.IO[bytes],
279 mimetype: str,
280 content_length: t.Optional[int],
281 options: t.Dict[str, str],
282 ) -> "t_parse_result":
283 parser = MultiPartParser(
284 self.stream_factory,
285 self.charset,
286 self.errors,
287 max_form_memory_size=self.max_form_memory_size,
288 cls=self.cls,
289 max_form_parts=self.max_form_parts,
290 )
291 boundary = options.get("boundary", "").encode("ascii")
293 if not boundary:
294 raise ValueError("Missing boundary")
296 form, files = parser.parse(stream, boundary, content_length)
297 return stream, form, files
299 @exhaust_stream
300 def _parse_urlencoded(
301 self,
302 stream: t.IO[bytes],
303 mimetype: str,
304 content_length: t.Optional[int],
305 options: t.Dict[str, str],
306 ) -> "t_parse_result":
307 if (
308 self.max_form_memory_size is not None
309 and content_length is not None
310 and content_length > self.max_form_memory_size
311 ):
312 # if the input stream is not exhausted, firefox reports Connection Reset
313 _exhaust(stream)
314 raise exceptions.RequestEntityTooLarge()
316 form = url_decode_stream(stream, self.charset, errors=self.errors, cls=self.cls)
317 return stream, form, self.cls()
319 #: mapping of mimetypes to parsing functions
320 parse_functions: t.Dict[
321 str,
322 t.Callable[
323 ["FormDataParser", t.IO[bytes], str, t.Optional[int], t.Dict[str, str]],
324 "t_parse_result",
325 ],
326 ] = {
327 "multipart/form-data": _parse_multipart,
328 "application/x-www-form-urlencoded": _parse_urlencoded,
329 "application/x-url-encoded": _parse_urlencoded,
330 }
333def _line_parse(line: str) -> t.Tuple[str, bool]:
334 """Removes line ending characters and returns a tuple (`stripped_line`,
335 `is_terminated`).
336 """
337 if line[-2:] == "\r\n":
338 return line[:-2], True
340 elif line[-1:] in {"\r", "\n"}:
341 return line[:-1], True
343 return line, False
346class MultiPartParser:
347 def __init__(
348 self,
349 stream_factory: t.Optional["TStreamFactory"] = None,
350 charset: str = "utf-8",
351 errors: str = "replace",
352 max_form_memory_size: t.Optional[int] = None,
353 cls: t.Optional[t.Type[MultiDict]] = None,
354 buffer_size: int = 64 * 1024,
355 max_form_parts: t.Optional[int] = None,
356 ) -> None:
357 self.charset = charset
358 self.errors = errors
359 self.max_form_memory_size = max_form_memory_size
360 self.max_form_parts = max_form_parts
362 if stream_factory is None:
363 stream_factory = default_stream_factory
365 self.stream_factory = stream_factory
367 if cls is None:
368 cls = MultiDict
370 self.cls = cls
372 self.buffer_size = buffer_size
374 def fail(self, message: str) -> "te.NoReturn":
375 raise ValueError(message)
377 def get_part_charset(self, headers: Headers) -> str:
378 # Figure out input charset for current part
379 content_type = headers.get("content-type")
381 if content_type:
382 mimetype, ct_params = parse_options_header(content_type)
383 return ct_params.get("charset", self.charset)
385 return self.charset
387 def start_file_streaming(
388 self, event: File, total_content_length: t.Optional[int]
389 ) -> t.IO[bytes]:
390 content_type = event.headers.get("content-type")
392 try:
393 content_length = int(event.headers["content-length"])
394 except (KeyError, ValueError):
395 content_length = 0
397 container = self.stream_factory(
398 total_content_length=total_content_length,
399 filename=event.filename,
400 content_type=content_type,
401 content_length=content_length,
402 )
403 return container
405 def parse(
406 self, stream: t.IO[bytes], boundary: bytes, content_length: t.Optional[int]
407 ) -> t.Tuple[MultiDict, MultiDict]:
408 container: t.Union[t.IO[bytes], t.List[bytes]]
409 _write: t.Callable[[bytes], t.Any]
411 iterator = chain(
412 _make_chunk_iter(
413 stream,
414 limit=content_length,
415 buffer_size=self.buffer_size,
416 ),
417 [None],
418 )
420 parser = MultipartDecoder(
421 boundary, self.max_form_memory_size, max_parts=self.max_form_parts
422 )
424 fields = []
425 files = []
427 current_part: Union[Field, File]
428 for data in iterator:
429 parser.receive_data(data)
430 event = parser.next_event()
431 while not isinstance(event, (Epilogue, NeedData)):
432 if isinstance(event, Field):
433 current_part = event
434 container = []
435 _write = container.append
436 elif isinstance(event, File):
437 current_part = event
438 container = self.start_file_streaming(event, content_length)
439 _write = container.write
440 elif isinstance(event, Data):
441 _write(event.data)
442 if not event.more_data:
443 if isinstance(current_part, Field):
444 value = b"".join(container).decode(
445 self.get_part_charset(current_part.headers), self.errors
446 )
447 fields.append((current_part.name, value))
448 else:
449 container = t.cast(t.IO[bytes], container)
450 container.seek(0)
451 files.append(
452 (
453 current_part.name,
454 FileStorage(
455 container,
456 current_part.filename,
457 current_part.name,
458 headers=current_part.headers,
459 ),
460 )
461 )
463 event = parser.next_event()
465 return self.cls(fields), self.cls(files)