Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/formparser.py: 21%

187 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-09 06:08 +0000

1from __future__ import annotations 

2 

3import typing as t 

4import warnings 

5from io import BytesIO 

6from urllib.parse import parse_qsl 

7 

8from ._internal import _plain_int 

9from .datastructures import FileStorage 

10from .datastructures import Headers 

11from .datastructures import MultiDict 

12from .exceptions import RequestEntityTooLarge 

13from .http import parse_options_header 

14from .sansio.multipart import Data 

15from .sansio.multipart import Epilogue 

16from .sansio.multipart import Field 

17from .sansio.multipart import File 

18from .sansio.multipart import MultipartDecoder 

19from .sansio.multipart import NeedData 

20from .wsgi import get_content_length 

21from .wsgi import get_input_stream 

22 

23# there are some platforms where SpooledTemporaryFile is not available. 

24# In that case we need to provide a fallback. 

25try: 

26 from tempfile import SpooledTemporaryFile 

27except ImportError: 

28 from tempfile import TemporaryFile 

29 

30 SpooledTemporaryFile = None # type: ignore 

31 

32if t.TYPE_CHECKING: 

33 import typing as te 

34 from _typeshed.wsgi import WSGIEnvironment 

35 

36 t_parse_result = t.Tuple[t.IO[bytes], MultiDict, MultiDict] 

37 

38 class TStreamFactory(te.Protocol): 

39 def __call__( 

40 self, 

41 total_content_length: int | None, 

42 content_type: str | None, 

43 filename: str | None, 

44 content_length: int | None = None, 

45 ) -> t.IO[bytes]: 

46 ... 

47 

48 

49F = t.TypeVar("F", bound=t.Callable[..., t.Any]) 

50 

51 

52def default_stream_factory( 

53 total_content_length: int | None, 

54 content_type: str | None, 

55 filename: str | None, 

56 content_length: int | None = None, 

57) -> t.IO[bytes]: 

58 max_size = 1024 * 500 

59 

60 if SpooledTemporaryFile is not None: 

61 return t.cast(t.IO[bytes], SpooledTemporaryFile(max_size=max_size, mode="rb+")) 

62 elif total_content_length is None or total_content_length > max_size: 

63 return t.cast(t.IO[bytes], TemporaryFile("rb+")) 

64 

65 return BytesIO() 

66 

67 

68def parse_form_data( 

69 environ: WSGIEnvironment, 

70 stream_factory: TStreamFactory | None = None, 

71 charset: str | None = None, 

72 errors: str | None = None, 

73 max_form_memory_size: int | None = None, 

74 max_content_length: int | None = None, 

75 cls: type[MultiDict] | None = None, 

76 silent: bool = True, 

77 *, 

78 max_form_parts: int | None = None, 

79) -> t_parse_result: 

80 """Parse the form data in the environ and return it as tuple in the form 

81 ``(stream, form, files)``. You should only call this method if the 

82 transport method is `POST`, `PUT`, or `PATCH`. 

83 

84 If the mimetype of the data transmitted is `multipart/form-data` the 

85 files multidict will be filled with `FileStorage` objects. If the 

86 mimetype is unknown the input stream is wrapped and returned as first 

87 argument, else the stream is empty. 

88 

89 This is a shortcut for the common usage of :class:`FormDataParser`. 

90 

91 :param environ: the WSGI environment to be used for parsing. 

92 :param stream_factory: An optional callable that returns a new read and 

93 writeable file descriptor. This callable works 

94 the same as :meth:`Response._get_file_stream`. 

95 :param max_form_memory_size: the maximum number of bytes to be accepted for 

96 in-memory stored form data. If the data 

97 exceeds the value specified an 

98 :exc:`~exceptions.RequestEntityTooLarge` 

99 exception is raised. 

100 :param max_content_length: If this is provided and the transmitted data 

101 is longer than this value an 

102 :exc:`~exceptions.RequestEntityTooLarge` 

103 exception is raised. 

104 :param cls: an optional dict class to use. If this is not specified 

105 or `None` the default :class:`MultiDict` is used. 

106 :param silent: If set to False parsing errors will not be caught. 

107 :param max_form_parts: The maximum number of multipart parts to be parsed. If this 

108 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised. 

109 :return: A tuple in the form ``(stream, form, files)``. 

110 

111 .. versionchanged:: 2.3 

112 Added the ``max_form_parts`` parameter. 

113 

114 .. versionchanged:: 2.3 

115 The ``charset`` and ``errors`` parameters are deprecated and will be removed in 

116 Werkzeug 3.0. 

117 

118 .. versionadded:: 0.5.1 

119 Added the ``silent`` parameter. 

120 

121 .. versionadded:: 0.5 

122 Added the ``max_form_memory_size``, ``max_content_length``, and ``cls`` 

123 parameters. 

124 """ 

125 return FormDataParser( 

126 stream_factory=stream_factory, 

127 charset=charset, 

128 errors=errors, 

129 max_form_memory_size=max_form_memory_size, 

130 max_content_length=max_content_length, 

131 max_form_parts=max_form_parts, 

132 silent=silent, 

133 cls=cls, 

134 ).parse_from_environ(environ) 

135 

136 

137class FormDataParser: 

138 """This class implements parsing of form data for Werkzeug. By itself 

139 it can parse multipart and url encoded form data. It can be subclassed 

140 and extended but for most mimetypes it is a better idea to use the 

141 untouched stream and expose it as separate attributes on a request 

142 object. 

143 

144 :param stream_factory: An optional callable that returns a new read and 

145 writeable file descriptor. This callable works 

146 the same as :meth:`Response._get_file_stream`. 

147 :param max_form_memory_size: the maximum number of bytes to be accepted for 

148 in-memory stored form data. If the data 

149 exceeds the value specified an 

150 :exc:`~exceptions.RequestEntityTooLarge` 

151 exception is raised. 

152 :param max_content_length: If this is provided and the transmitted data 

153 is longer than this value an 

154 :exc:`~exceptions.RequestEntityTooLarge` 

155 exception is raised. 

156 :param cls: an optional dict class to use. If this is not specified 

157 or `None` the default :class:`MultiDict` is used. 

158 :param silent: If set to False parsing errors will not be caught. 

159 :param max_form_parts: The maximum number of multipart parts to be parsed. If this 

160 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised. 

161 

162 .. versionchanged:: 2.3 

163 The ``charset`` and ``errors`` parameters are deprecated and will be removed in 

164 Werkzeug 3.0. 

165 

166 .. versionchanged:: 2.3 

167 The ``parse_functions`` attribute and ``get_parse_func`` methods are deprecated 

168 and will be removed in Werkzeug 3.0. 

169 

170 .. versionchanged:: 2.2.3 

171 Added the ``max_form_parts`` parameter. 

172 

173 .. versionadded:: 0.8 

174 """ 

175 

176 def __init__( 

177 self, 

178 stream_factory: TStreamFactory | None = None, 

179 charset: str | None = None, 

180 errors: str | None = None, 

181 max_form_memory_size: int | None = None, 

182 max_content_length: int | None = None, 

183 cls: type[MultiDict] | None = None, 

184 silent: bool = True, 

185 *, 

186 max_form_parts: int | None = None, 

187 ) -> None: 

188 if stream_factory is None: 

189 stream_factory = default_stream_factory 

190 

191 self.stream_factory = stream_factory 

192 

193 if charset is not None: 

194 warnings.warn( 

195 "The 'charset' parameter is deprecated and will be" 

196 " removed in Werkzeug 3.0.", 

197 DeprecationWarning, 

198 stacklevel=2, 

199 ) 

200 else: 

201 charset = "utf-8" 

202 

203 self.charset = charset 

204 

205 if errors is not None: 

206 warnings.warn( 

207 "The 'errors' parameter is deprecated and will be" 

208 " removed in Werkzeug 3.0.", 

209 DeprecationWarning, 

210 stacklevel=2, 

211 ) 

212 else: 

213 errors = "replace" 

214 

215 self.errors = errors 

216 self.max_form_memory_size = max_form_memory_size 

217 self.max_content_length = max_content_length 

218 self.max_form_parts = max_form_parts 

219 

220 if cls is None: 

221 cls = MultiDict 

222 

223 self.cls = cls 

224 self.silent = silent 

225 

226 def get_parse_func( 

227 self, mimetype: str, options: dict[str, str] 

228 ) -> None | ( 

229 t.Callable[ 

230 [FormDataParser, t.IO[bytes], str, int | None, dict[str, str]], 

231 t_parse_result, 

232 ] 

233 ): 

234 warnings.warn( 

235 "The 'get_parse_func' method is deprecated and will be" 

236 " removed in Werkzeug 3.0.", 

237 DeprecationWarning, 

238 stacklevel=2, 

239 ) 

240 

241 if mimetype == "multipart/form-data": 

242 return type(self)._parse_multipart 

243 elif mimetype == "application/x-www-form-urlencoded": 

244 return type(self)._parse_urlencoded 

245 elif mimetype == "application/x-url-encoded": 

246 warnings.warn( 

247 "The 'application/x-url-encoded' mimetype is invalid, and will not be" 

248 " treated as 'application/x-www-form-urlencoded' in Werkzeug 3.0.", 

249 DeprecationWarning, 

250 stacklevel=2, 

251 ) 

252 return type(self)._parse_urlencoded 

253 elif mimetype in self.parse_functions: 

254 warnings.warn( 

255 "The 'parse_functions' attribute is deprecated and will be removed in" 

256 " Werkzeug 3.0. Override 'parse' instead.", 

257 DeprecationWarning, 

258 stacklevel=2, 

259 ) 

260 return self.parse_functions[mimetype] 

261 

262 return None 

263 

264 def parse_from_environ(self, environ: WSGIEnvironment) -> t_parse_result: 

265 """Parses the information from the environment as form data. 

266 

267 :param environ: the WSGI environment to be used for parsing. 

268 :return: A tuple in the form ``(stream, form, files)``. 

269 """ 

270 stream = get_input_stream(environ, max_content_length=self.max_content_length) 

271 content_length = get_content_length(environ) 

272 mimetype, options = parse_options_header(environ.get("CONTENT_TYPE")) 

273 return self.parse( 

274 stream, 

275 content_length=content_length, 

276 mimetype=mimetype, 

277 options=options, 

278 ) 

279 

280 def parse( 

281 self, 

282 stream: t.IO[bytes], 

283 mimetype: str, 

284 content_length: int | None, 

285 options: dict[str, str] | None = None, 

286 ) -> t_parse_result: 

287 """Parses the information from the given stream, mimetype, 

288 content length and mimetype parameters. 

289 

290 :param stream: an input stream 

291 :param mimetype: the mimetype of the data 

292 :param content_length: the content length of the incoming data 

293 :param options: optional mimetype parameters (used for 

294 the multipart boundary for instance) 

295 :return: A tuple in the form ``(stream, form, files)``. 

296 

297 .. versionchanged:: 2.3 

298 The ``application/x-url-encoded`` content type is deprecated and will not be 

299 treated as ``application/x-www-form-urlencoded`` in Werkzeug 3.0. 

300 """ 

301 if mimetype == "multipart/form-data": 

302 parse_func = self._parse_multipart 

303 elif mimetype == "application/x-www-form-urlencoded": 

304 parse_func = self._parse_urlencoded 

305 elif mimetype == "application/x-url-encoded": 

306 warnings.warn( 

307 "The 'application/x-url-encoded' mimetype is invalid, and will not be" 

308 " treated as 'application/x-www-form-urlencoded' in Werkzeug 3.0.", 

309 DeprecationWarning, 

310 stacklevel=2, 

311 ) 

312 parse_func = self._parse_urlencoded 

313 elif mimetype in self.parse_functions: 

314 warnings.warn( 

315 "The 'parse_functions' attribute is deprecated and will be removed in" 

316 " Werkzeug 3.0. Override 'parse' instead.", 

317 DeprecationWarning, 

318 stacklevel=2, 

319 ) 

320 parse_func = self.parse_functions[mimetype].__get__(self, type(self)) 

321 else: 

322 return stream, self.cls(), self.cls() 

323 

324 if options is None: 

325 options = {} 

326 

327 try: 

328 return parse_func(stream, mimetype, content_length, options) 

329 except ValueError: 

330 if not self.silent: 

331 raise 

332 

333 return stream, self.cls(), self.cls() 

334 

335 def _parse_multipart( 

336 self, 

337 stream: t.IO[bytes], 

338 mimetype: str, 

339 content_length: int | None, 

340 options: dict[str, str], 

341 ) -> t_parse_result: 

342 charset = self.charset if self.charset != "utf-8" else None 

343 errors = self.errors if self.errors != "replace" else None 

344 parser = MultiPartParser( 

345 stream_factory=self.stream_factory, 

346 charset=charset, 

347 errors=errors, 

348 max_form_memory_size=self.max_form_memory_size, 

349 max_form_parts=self.max_form_parts, 

350 cls=self.cls, 

351 ) 

352 boundary = options.get("boundary", "").encode("ascii") 

353 

354 if not boundary: 

355 raise ValueError("Missing boundary") 

356 

357 form, files = parser.parse(stream, boundary, content_length) 

358 return stream, form, files 

359 

360 def _parse_urlencoded( 

361 self, 

362 stream: t.IO[bytes], 

363 mimetype: str, 

364 content_length: int | None, 

365 options: dict[str, str], 

366 ) -> t_parse_result: 

367 if ( 

368 self.max_form_memory_size is not None 

369 and content_length is not None 

370 and content_length > self.max_form_memory_size 

371 ): 

372 raise RequestEntityTooLarge() 

373 

374 try: 

375 items = parse_qsl( 

376 stream.read().decode(), 

377 keep_blank_values=True, 

378 encoding=self.charset, 

379 errors="werkzeug.url_quote", 

380 ) 

381 except ValueError as e: 

382 raise RequestEntityTooLarge() from e 

383 

384 return stream, self.cls(items), self.cls() 

385 

386 parse_functions: dict[ 

387 str, 

388 t.Callable[ 

389 [FormDataParser, t.IO[bytes], str, int | None, dict[str, str]], 

390 t_parse_result, 

391 ], 

392 ] = {} 

393 

394 

395class MultiPartParser: 

396 def __init__( 

397 self, 

398 stream_factory: TStreamFactory | None = None, 

399 charset: str | None = None, 

400 errors: str | None = None, 

401 max_form_memory_size: int | None = None, 

402 cls: type[MultiDict] | None = None, 

403 buffer_size: int = 64 * 1024, 

404 max_form_parts: int | None = None, 

405 ) -> None: 

406 if charset is not None: 

407 warnings.warn( 

408 "The 'charset' parameter is deprecated and will be" 

409 " removed in Werkzeug 3.0.", 

410 DeprecationWarning, 

411 stacklevel=2, 

412 ) 

413 else: 

414 charset = "utf-8" 

415 

416 self.charset = charset 

417 

418 if errors is not None: 

419 warnings.warn( 

420 "The 'errors' parameter is deprecated and will be" 

421 " removed in Werkzeug 3.0.", 

422 DeprecationWarning, 

423 stacklevel=2, 

424 ) 

425 else: 

426 errors = "replace" 

427 

428 self.errors = errors 

429 self.max_form_memory_size = max_form_memory_size 

430 self.max_form_parts = max_form_parts 

431 

432 if stream_factory is None: 

433 stream_factory = default_stream_factory 

434 

435 self.stream_factory = stream_factory 

436 

437 if cls is None: 

438 cls = MultiDict 

439 

440 self.cls = cls 

441 self.buffer_size = buffer_size 

442 

443 def fail(self, message: str) -> te.NoReturn: 

444 raise ValueError(message) 

445 

446 def get_part_charset(self, headers: Headers) -> str: 

447 # Figure out input charset for current part 

448 content_type = headers.get("content-type") 

449 

450 if content_type: 

451 parameters = parse_options_header(content_type)[1] 

452 ct_charset = parameters.get("charset", "").lower() 

453 

454 # A safe list of encodings. Modern clients should only send ASCII or UTF-8. 

455 # This list will not be extended further. 

456 if ct_charset in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}: 

457 return ct_charset 

458 

459 return self.charset 

460 

461 def start_file_streaming( 

462 self, event: File, total_content_length: int | None 

463 ) -> t.IO[bytes]: 

464 content_type = event.headers.get("content-type") 

465 

466 try: 

467 content_length = _plain_int(event.headers["content-length"]) 

468 except (KeyError, ValueError): 

469 content_length = 0 

470 

471 container = self.stream_factory( 

472 total_content_length=total_content_length, 

473 filename=event.filename, 

474 content_type=content_type, 

475 content_length=content_length, 

476 ) 

477 return container 

478 

479 def parse( 

480 self, stream: t.IO[bytes], boundary: bytes, content_length: int | None 

481 ) -> tuple[MultiDict, MultiDict]: 

482 current_part: Field | File 

483 container: t.IO[bytes] | list[bytes] 

484 _write: t.Callable[[bytes], t.Any] 

485 

486 parser = MultipartDecoder( 

487 boundary, 

488 max_form_memory_size=self.max_form_memory_size, 

489 max_parts=self.max_form_parts, 

490 ) 

491 

492 fields = [] 

493 files = [] 

494 

495 for data in _chunk_iter(stream.read, self.buffer_size): 

496 parser.receive_data(data) 

497 event = parser.next_event() 

498 while not isinstance(event, (Epilogue, NeedData)): 

499 if isinstance(event, Field): 

500 current_part = event 

501 container = [] 

502 _write = container.append 

503 elif isinstance(event, File): 

504 current_part = event 

505 container = self.start_file_streaming(event, content_length) 

506 _write = container.write 

507 elif isinstance(event, Data): 

508 _write(event.data) 

509 if not event.more_data: 

510 if isinstance(current_part, Field): 

511 value = b"".join(container).decode( 

512 self.get_part_charset(current_part.headers), self.errors 

513 ) 

514 fields.append((current_part.name, value)) 

515 else: 

516 container = t.cast(t.IO[bytes], container) 

517 container.seek(0) 

518 files.append( 

519 ( 

520 current_part.name, 

521 FileStorage( 

522 container, 

523 current_part.filename, 

524 current_part.name, 

525 headers=current_part.headers, 

526 ), 

527 ) 

528 ) 

529 

530 event = parser.next_event() 

531 

532 return self.cls(fields), self.cls(files) 

533 

534 

535def _chunk_iter(read: t.Callable[[int], bytes], size: int) -> t.Iterator[bytes | None]: 

536 """Read data in chunks for multipart/form-data parsing. Stop if no data is read. 

537 Yield ``None`` at the end to signal end of parsing. 

538 """ 

539 while True: 

540 data = read(size) 

541 

542 if not data: 

543 break 

544 

545 yield data 

546 

547 yield None