Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/formparser.py: 28%
169 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1import typing as t
2from functools import update_wrapper
3from io import BytesIO
4from itertools import chain
5from typing import Union
7from . import exceptions
8from .datastructures import FileStorage
9from .datastructures import Headers
10from .datastructures import MultiDict
11from .http import parse_options_header
12from .sansio.multipart import Data
13from .sansio.multipart import Epilogue
14from .sansio.multipart import Field
15from .sansio.multipart import File
16from .sansio.multipart import MultipartDecoder
17from .sansio.multipart import NeedData
18from .urls import url_decode_stream
19from .wsgi import _make_chunk_iter
20from .wsgi import get_content_length
21from .wsgi import get_input_stream
23# there are some platforms where SpooledTemporaryFile is not available.
24# In that case we need to provide a fallback.
25try:
26 from tempfile import SpooledTemporaryFile
27except ImportError:
28 from tempfile import TemporaryFile
30 SpooledTemporaryFile = None # type: ignore
32if t.TYPE_CHECKING:
33 import typing as te
34 from _typeshed.wsgi import WSGIEnvironment
36 t_parse_result = t.Tuple[t.IO[bytes], MultiDict, MultiDict]
38 class TStreamFactory(te.Protocol):
39 def __call__(
40 self,
41 total_content_length: t.Optional[int],
42 content_type: t.Optional[str],
43 filename: t.Optional[str],
44 content_length: t.Optional[int] = None,
45 ) -> t.IO[bytes]:
46 ...
49F = t.TypeVar("F", bound=t.Callable[..., t.Any])
52def _exhaust(stream: t.IO[bytes]) -> None:
53 bts = stream.read(64 * 1024)
54 while bts:
55 bts = stream.read(64 * 1024)
58def default_stream_factory(
59 total_content_length: t.Optional[int],
60 content_type: t.Optional[str],
61 filename: t.Optional[str],
62 content_length: t.Optional[int] = None,
63) -> t.IO[bytes]:
64 max_size = 1024 * 500
66 if SpooledTemporaryFile is not None:
67 return t.cast(t.IO[bytes], SpooledTemporaryFile(max_size=max_size, mode="rb+"))
68 elif total_content_length is None or total_content_length > max_size:
69 return t.cast(t.IO[bytes], TemporaryFile("rb+"))
71 return BytesIO()
74def parse_form_data(
75 environ: "WSGIEnvironment",
76 stream_factory: t.Optional["TStreamFactory"] = None,
77 charset: str = "utf-8",
78 errors: str = "replace",
79 max_form_memory_size: t.Optional[int] = None,
80 max_content_length: t.Optional[int] = None,
81 cls: t.Optional[t.Type[MultiDict]] = None,
82 silent: bool = True,
83) -> "t_parse_result":
84 """Parse the form data in the environ and return it as tuple in the form
85 ``(stream, form, files)``. You should only call this method if the
86 transport method is `POST`, `PUT`, or `PATCH`.
88 If the mimetype of the data transmitted is `multipart/form-data` the
89 files multidict will be filled with `FileStorage` objects. If the
90 mimetype is unknown the input stream is wrapped and returned as first
91 argument, else the stream is empty.
93 This is a shortcut for the common usage of :class:`FormDataParser`.
95 Have a look at :doc:`/request_data` for more details.
97 .. versionadded:: 0.5
98 The `max_form_memory_size`, `max_content_length` and
99 `cls` parameters were added.
101 .. versionadded:: 0.5.1
102 The optional `silent` flag was added.
104 :param environ: the WSGI environment to be used for parsing.
105 :param stream_factory: An optional callable that returns a new read and
106 writeable file descriptor. This callable works
107 the same as :meth:`Response._get_file_stream`.
108 :param charset: The character set for URL and url encoded form data.
109 :param errors: The encoding error behavior.
110 :param max_form_memory_size: the maximum number of bytes to be accepted for
111 in-memory stored form data. If the data
112 exceeds the value specified an
113 :exc:`~exceptions.RequestEntityTooLarge`
114 exception is raised.
115 :param max_content_length: If this is provided and the transmitted data
116 is longer than this value an
117 :exc:`~exceptions.RequestEntityTooLarge`
118 exception is raised.
119 :param cls: an optional dict class to use. If this is not specified
120 or `None` the default :class:`MultiDict` is used.
121 :param silent: If set to False parsing errors will not be caught.
122 :return: A tuple in the form ``(stream, form, files)``.
123 """
124 return FormDataParser(
125 stream_factory,
126 charset,
127 errors,
128 max_form_memory_size,
129 max_content_length,
130 cls,
131 silent,
132 ).parse_from_environ(environ)
135def exhaust_stream(f: F) -> F:
136 """Helper decorator for methods that exhausts the stream on return."""
138 def wrapper(self, stream, *args, **kwargs): # type: ignore
139 try:
140 return f(self, stream, *args, **kwargs)
141 finally:
142 exhaust = getattr(stream, "exhaust", None)
144 if exhaust is not None:
145 exhaust()
146 else:
147 while True:
148 chunk = stream.read(1024 * 64)
150 if not chunk:
151 break
153 return update_wrapper(t.cast(F, wrapper), f)
156class FormDataParser:
157 """This class implements parsing of form data for Werkzeug. By itself
158 it can parse multipart and url encoded form data. It can be subclassed
159 and extended but for most mimetypes it is a better idea to use the
160 untouched stream and expose it as separate attributes on a request
161 object.
163 .. versionadded:: 0.8
165 :param stream_factory: An optional callable that returns a new read and
166 writeable file descriptor. This callable works
167 the same as :meth:`Response._get_file_stream`.
168 :param charset: The character set for URL and url encoded form data.
169 :param errors: The encoding error behavior.
170 :param max_form_memory_size: the maximum number of bytes to be accepted for
171 in-memory stored form data. If the data
172 exceeds the value specified an
173 :exc:`~exceptions.RequestEntityTooLarge`
174 exception is raised.
175 :param max_content_length: If this is provided and the transmitted data
176 is longer than this value an
177 :exc:`~exceptions.RequestEntityTooLarge`
178 exception is raised.
179 :param cls: an optional dict class to use. If this is not specified
180 or `None` the default :class:`MultiDict` is used.
181 :param silent: If set to False parsing errors will not be caught.
182 """
184 def __init__(
185 self,
186 stream_factory: t.Optional["TStreamFactory"] = None,
187 charset: str = "utf-8",
188 errors: str = "replace",
189 max_form_memory_size: t.Optional[int] = None,
190 max_content_length: t.Optional[int] = None,
191 cls: t.Optional[t.Type[MultiDict]] = None,
192 silent: bool = True,
193 ) -> None:
194 if stream_factory is None:
195 stream_factory = default_stream_factory
197 self.stream_factory = stream_factory
198 self.charset = charset
199 self.errors = errors
200 self.max_form_memory_size = max_form_memory_size
201 self.max_content_length = max_content_length
203 if cls is None:
204 cls = MultiDict
206 self.cls = cls
207 self.silent = silent
209 def get_parse_func(
210 self, mimetype: str, options: t.Dict[str, str]
211 ) -> t.Optional[
212 t.Callable[
213 ["FormDataParser", t.IO[bytes], str, t.Optional[int], t.Dict[str, str]],
214 "t_parse_result",
215 ]
216 ]:
217 return self.parse_functions.get(mimetype)
219 def parse_from_environ(self, environ: "WSGIEnvironment") -> "t_parse_result":
220 """Parses the information from the environment as form data.
222 :param environ: the WSGI environment to be used for parsing.
223 :return: A tuple in the form ``(stream, form, files)``.
224 """
225 content_type = environ.get("CONTENT_TYPE", "")
226 content_length = get_content_length(environ)
227 mimetype, options = parse_options_header(content_type)
228 return self.parse(get_input_stream(environ), mimetype, content_length, options)
230 def parse(
231 self,
232 stream: t.IO[bytes],
233 mimetype: str,
234 content_length: t.Optional[int],
235 options: t.Optional[t.Dict[str, str]] = None,
236 ) -> "t_parse_result":
237 """Parses the information from the given stream, mimetype,
238 content length and mimetype parameters.
240 :param stream: an input stream
241 :param mimetype: the mimetype of the data
242 :param content_length: the content length of the incoming data
243 :param options: optional mimetype parameters (used for
244 the multipart boundary for instance)
245 :return: A tuple in the form ``(stream, form, files)``.
246 """
247 if (
248 self.max_content_length is not None
249 and content_length is not None
250 and content_length > self.max_content_length
251 ):
252 # if the input stream is not exhausted, firefox reports Connection Reset
253 _exhaust(stream)
254 raise exceptions.RequestEntityTooLarge()
256 if options is None:
257 options = {}
259 parse_func = self.get_parse_func(mimetype, options)
261 if parse_func is not None:
262 try:
263 return parse_func(self, stream, mimetype, content_length, options)
264 except ValueError:
265 if not self.silent:
266 raise
268 return stream, self.cls(), self.cls()
270 @exhaust_stream
271 def _parse_multipart(
272 self,
273 stream: t.IO[bytes],
274 mimetype: str,
275 content_length: t.Optional[int],
276 options: t.Dict[str, str],
277 ) -> "t_parse_result":
278 parser = MultiPartParser(
279 self.stream_factory,
280 self.charset,
281 self.errors,
282 max_form_memory_size=self.max_form_memory_size,
283 cls=self.cls,
284 )
285 boundary = options.get("boundary", "").encode("ascii")
287 if not boundary:
288 raise ValueError("Missing boundary")
290 form, files = parser.parse(stream, boundary, content_length)
291 return stream, form, files
293 @exhaust_stream
294 def _parse_urlencoded(
295 self,
296 stream: t.IO[bytes],
297 mimetype: str,
298 content_length: t.Optional[int],
299 options: t.Dict[str, str],
300 ) -> "t_parse_result":
301 if (
302 self.max_form_memory_size is not None
303 and content_length is not None
304 and content_length > self.max_form_memory_size
305 ):
306 # if the input stream is not exhausted, firefox reports Connection Reset
307 _exhaust(stream)
308 raise exceptions.RequestEntityTooLarge()
310 form = url_decode_stream(stream, self.charset, errors=self.errors, cls=self.cls)
311 return stream, form, self.cls()
313 #: mapping of mimetypes to parsing functions
314 parse_functions: t.Dict[
315 str,
316 t.Callable[
317 ["FormDataParser", t.IO[bytes], str, t.Optional[int], t.Dict[str, str]],
318 "t_parse_result",
319 ],
320 ] = {
321 "multipart/form-data": _parse_multipart,
322 "application/x-www-form-urlencoded": _parse_urlencoded,
323 "application/x-url-encoded": _parse_urlencoded,
324 }
327def _line_parse(line: str) -> t.Tuple[str, bool]:
328 """Removes line ending characters and returns a tuple (`stripped_line`,
329 `is_terminated`).
330 """
331 if line[-2:] == "\r\n":
332 return line[:-2], True
334 elif line[-1:] in {"\r", "\n"}:
335 return line[:-1], True
337 return line, False
340class MultiPartParser:
341 def __init__(
342 self,
343 stream_factory: t.Optional["TStreamFactory"] = None,
344 charset: str = "utf-8",
345 errors: str = "replace",
346 max_form_memory_size: t.Optional[int] = None,
347 cls: t.Optional[t.Type[MultiDict]] = None,
348 buffer_size: int = 64 * 1024,
349 ) -> None:
350 self.charset = charset
351 self.errors = errors
352 self.max_form_memory_size = max_form_memory_size
354 if stream_factory is None:
355 stream_factory = default_stream_factory
357 self.stream_factory = stream_factory
359 if cls is None:
360 cls = MultiDict
362 self.cls = cls
364 self.buffer_size = buffer_size
366 def fail(self, message: str) -> "te.NoReturn":
367 raise ValueError(message)
369 def get_part_charset(self, headers: Headers) -> str:
370 # Figure out input charset for current part
371 content_type = headers.get("content-type")
373 if content_type:
374 mimetype, ct_params = parse_options_header(content_type)
375 return ct_params.get("charset", self.charset)
377 return self.charset
379 def start_file_streaming(
380 self, event: File, total_content_length: t.Optional[int]
381 ) -> t.IO[bytes]:
382 content_type = event.headers.get("content-type")
384 try:
385 content_length = int(event.headers["content-length"])
386 except (KeyError, ValueError):
387 content_length = 0
389 container = self.stream_factory(
390 total_content_length=total_content_length,
391 filename=event.filename,
392 content_type=content_type,
393 content_length=content_length,
394 )
395 return container
397 def parse(
398 self, stream: t.IO[bytes], boundary: bytes, content_length: t.Optional[int]
399 ) -> t.Tuple[MultiDict, MultiDict]:
400 container: t.Union[t.IO[bytes], t.List[bytes]]
401 _write: t.Callable[[bytes], t.Any]
403 iterator = chain(
404 _make_chunk_iter(
405 stream,
406 limit=content_length,
407 buffer_size=self.buffer_size,
408 ),
409 [None],
410 )
412 parser = MultipartDecoder(boundary, self.max_form_memory_size)
414 fields = []
415 files = []
417 current_part: Union[Field, File]
418 for data in iterator:
419 parser.receive_data(data)
420 event = parser.next_event()
421 while not isinstance(event, (Epilogue, NeedData)):
422 if isinstance(event, Field):
423 current_part = event
424 container = []
425 _write = container.append
426 elif isinstance(event, File):
427 current_part = event
428 container = self.start_file_streaming(event, content_length)
429 _write = container.write
430 elif isinstance(event, Data):
431 _write(event.data)
432 if not event.more_data:
433 if isinstance(current_part, Field):
434 value = b"".join(container).decode(
435 self.get_part_charset(current_part.headers), self.errors
436 )
437 fields.append((current_part.name, value))
438 else:
439 container = t.cast(t.IO[bytes], container)
440 container.seek(0)
441 files.append(
442 (
443 current_part.name,
444 FileStorage(
445 container,
446 current_part.filename,
447 current_part.name,
448 headers=current_part.headers,
449 ),
450 )
451 )
453 event = parser.next_event()
455 return self.cls(fields), self.cls(files)