Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/formparser.py: 67%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

168 statements  

1from __future__ import annotations 

2 

3import typing as t 

4from tempfile import SpooledTemporaryFile 

5from types import TracebackType 

6from urllib.parse import parse_qsl 

7 

8from ._internal import _plain_int 

9from .datastructures import FileStorage 

10from .datastructures import Headers 

11from .datastructures import ImmutableMultiDict 

12from .datastructures import MultiDict 

13from .exceptions import RequestEntityTooLarge 

14from .http import parse_options_header 

15from .sansio.multipart import Data 

16from .sansio.multipart import Epilogue 

17from .sansio.multipart import Field 

18from .sansio.multipart import File 

19from .sansio.multipart import MultipartDecoder 

20from .sansio.multipart import NeedData 

21from .wsgi import get_content_length 

22from .wsgi import get_input_stream 

23 

24if t.TYPE_CHECKING: 

25 import typing_extensions as te 

26 from _typeshed.wsgi import WSGIEnvironment 

27 

28 t_parse_result = tuple[ 

29 t.IO[bytes], MultiDict[str, str], MultiDict[str, FileStorage] 

30 ] 

31 

32 class TStreamFactory(te.Protocol): 

33 def __call__( 

34 self, 

35 total_content_length: int | None, 

36 content_type: str | None, 

37 filename: str | None, 

38 content_length: int | None = None, 

39 ) -> t.IO[bytes]: ... 

40 

41 

42F = t.TypeVar("F", bound=t.Callable[..., t.Any]) 

43 

44 

45def default_stream_factory( 

46 total_content_length: int | None, 

47 content_type: str | None, 

48 filename: str | None, 

49 content_length: int | None = None, 

50) -> t.IO[bytes]: 

51 return SpooledTemporaryFile(max_size=1024 * 500, mode="rb+") 

52 

53 

54def parse_form_data( 

55 environ: WSGIEnvironment, 

56 stream_factory: TStreamFactory | None = None, 

57 max_form_memory_size: int | None = None, 

58 max_content_length: int | None = None, 

59 silent: bool = True, 

60 *, 

61 max_form_parts: int | None = None, 

62 **kwargs: t.Any, 

63) -> t_parse_result: 

64 """Parse the form data in the environ and return it as tuple in the form 

65 ``(stream, form, files)``. You should only call this method if the 

66 transport method is `POST`, `PUT`, or `PATCH`. 

67 

68 If the mimetype of the data transmitted is `multipart/form-data` the 

69 files multidict will be filled with `FileStorage` objects. If the 

70 mimetype is unknown the input stream is wrapped and returned as first 

71 argument, else the stream is empty. 

72 

73 This is a shortcut for the common usage of :class:`FormDataParser`. 

74 

75 :param environ: the WSGI environment to be used for parsing. 

76 :param stream_factory: An optional callable that returns a new read and 

77 writeable file descriptor. This callable works 

78 the same as :meth:`Response._get_file_stream`. 

79 :param max_form_memory_size: the maximum number of bytes to be accepted for 

80 in-memory stored form data. If the data 

81 exceeds the value specified an 

82 :exc:`~exceptions.RequestEntityTooLarge` 

83 exception is raised. 

84 :param max_content_length: If this is provided and the transmitted data 

85 is longer than this value an 

86 :exc:`~exceptions.RequestEntityTooLarge` 

87 exception is raised. 

88 :param silent: If set to False parsing errors will not be caught. 

89 :param max_form_parts: The maximum number of multipart parts to be parsed. If this 

90 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised. 

91 :return: A tuple in the form ``(stream, form, files)``. 

92 

93 .. versionchanged:: 3.2 

94 The ``cls`` parameter is deprecated and will be removed in Werkzeug 3.3. It will 

95 always be ``ImmutableMultiDict``. 

96 

97 .. versionchanged:: 3.0 

98 The ``charset`` and ``errors`` parameters were removed. 

99 

100 .. versionchanged:: 2.3 

101 Added the ``max_form_parts`` parameter. 

102 

103 .. versionadded:: 0.5.1 

104 Added the ``silent`` parameter. 

105 

106 .. versionadded:: 0.5 

107 Added the ``max_form_memory_size``, ``max_content_length``, and ``cls`` 

108 parameters. 

109 """ 

110 parser_kwargs: dict[str, t.Any] = dict( 

111 stream_factory=stream_factory, 

112 max_form_memory_size=max_form_memory_size, 

113 max_content_length=max_content_length, 

114 max_form_parts=max_form_parts, 

115 silent=silent, 

116 ) 

117 

118 if "cls" in kwargs: 

119 import warnings 

120 

121 warnings.warn( 

122 "The 'cls' parameter is deprecated and will be removed in Werkzeug 3.3." 

123 " It will always be 'ImmutableMultiDict'.", 

124 DeprecationWarning, 

125 stacklevel=2, 

126 ) 

127 parser_kwargs["cls"] = kwargs["cls"] 

128 

129 return FormDataParser(**parser_kwargs).parse_from_environ(environ) 

130 

131 

132class FormDataParser: 

133 """This class implements parsing of form data for Werkzeug. By itself 

134 it can parse multipart and url encoded form data. It can be subclassed 

135 and extended but for most mimetypes it is a better idea to use the 

136 untouched stream and expose it as separate attributes on a request 

137 object. 

138 

139 :param stream_factory: An optional callable that returns a new read and 

140 writeable file descriptor. This callable works 

141 the same as :meth:`Response._get_file_stream`. 

142 :param max_form_memory_size: the maximum number of bytes to be accepted for 

143 in-memory stored form data. If the data 

144 exceeds the value specified an 

145 :exc:`~exceptions.RequestEntityTooLarge` 

146 exception is raised. 

147 :param max_content_length: If this is provided and the transmitted data 

148 is longer than this value an 

149 :exc:`~exceptions.RequestEntityTooLarge` 

150 exception is raised. 

151 :param silent: If set to False parsing errors will not be caught. 

152 :param max_form_parts: The maximum number of multipart parts to be parsed. If this 

153 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised. 

154 

155 .. versionchanged:: 3.2 

156 The ``cls`` parameter and attribute are deprecated and will be removed 

157 in Werkzeug 3.3. They will always be ``ImmutableMultiDict``. 

158 

159 .. versionchanged:: 3.0 

160 The ``charset`` and ``errors`` parameters were removed. 

161 

162 .. versionchanged:: 3.0 

163 The ``parse_functions`` attribute and ``get_parse_func`` methods were removed. 

164 

165 .. versionchanged:: 2.2.3 

166 Added the ``max_form_parts`` parameter. 

167 

168 .. versionadded:: 0.8 

169 """ 

170 

171 def __init__( 

172 self, 

173 stream_factory: TStreamFactory | None = None, 

174 max_form_memory_size: int | None = None, 

175 max_content_length: int | None = None, 

176 silent: bool = True, 

177 *, 

178 max_form_parts: int | None = None, 

179 **kwargs: t.Any, 

180 ) -> None: 

181 if stream_factory is None: 

182 stream_factory = default_stream_factory 

183 

184 self.stream_factory = stream_factory 

185 self.max_form_memory_size = max_form_memory_size 

186 self.max_content_length = max_content_length 

187 self.max_form_parts = max_form_parts 

188 

189 if "cls" in kwargs: 

190 import warnings 

191 

192 warnings.warn( 

193 "The 'cls' parameter is deprecated and will be removed in Werkzeug 3.3." 

194 " It will always be 'ImmutableMultiDict'.", 

195 DeprecationWarning, 

196 stacklevel=2, 

197 ) 

198 

199 self.cls: type[ImmutableMultiDict[str, t.Any]] | None = kwargs.get("cls") 

200 self.silent = silent 

201 

202 def parse_from_environ(self, environ: WSGIEnvironment) -> t_parse_result: 

203 """Parses the information from the environment as form data. 

204 

205 :param environ: the WSGI environment to be used for parsing. 

206 :return: A tuple in the form ``(stream, form, files)``. 

207 """ 

208 stream = get_input_stream(environ, max_content_length=self.max_content_length) 

209 content_length = get_content_length(environ) 

210 mimetype, options = parse_options_header(environ.get("CONTENT_TYPE")) 

211 return self.parse( 

212 stream, 

213 content_length=content_length, 

214 mimetype=mimetype, 

215 options=options, 

216 ) 

217 

218 def parse( 

219 self, 

220 stream: t.IO[bytes], 

221 mimetype: str, 

222 content_length: int | None, 

223 options: dict[str, str] | None = None, 

224 ) -> t_parse_result: 

225 """Parses the information from the given stream, mimetype, 

226 content length and mimetype parameters. 

227 

228 :param stream: an input stream 

229 :param mimetype: the mimetype of the data 

230 :param content_length: the content length of the incoming data 

231 :param options: optional mimetype parameters (used for 

232 the multipart boundary for instance) 

233 :return: A tuple in the form ``(stream, form, files)``. 

234 

235 .. versionchanged:: 3.0 

236 The invalid ``application/x-url-encoded`` content type is not 

237 treated as ``application/x-www-form-urlencoded``. 

238 """ 

239 if mimetype == "multipart/form-data": 

240 parse_func = self._parse_multipart 

241 elif mimetype == "application/x-www-form-urlencoded": 

242 parse_func = self._parse_urlencoded 

243 else: 

244 if self.cls is not None: 

245 return stream, self.cls(), self.cls() 

246 

247 return stream, ImmutableMultiDict(), ImmutableMultiDict() 

248 

249 if options is None: 

250 options = {} 

251 

252 try: 

253 return parse_func(stream, mimetype, content_length, options) 

254 except ValueError: 

255 if not self.silent: 

256 raise 

257 

258 if self.cls is not None: 

259 return stream, self.cls(), self.cls() 

260 

261 return stream, ImmutableMultiDict(), ImmutableMultiDict() 

262 

263 def _parse_multipart( 

264 self, 

265 stream: t.IO[bytes], 

266 mimetype: str, 

267 content_length: int | None, 

268 options: dict[str, str], 

269 ) -> t_parse_result: 

270 boundary = options.get("boundary", "").encode("ascii") 

271 

272 if not boundary: 

273 raise ValueError("Missing boundary") 

274 

275 kwargs: dict[str, t.Any] = dict( 

276 stream_factory=self.stream_factory, 

277 max_form_memory_size=self.max_form_memory_size, 

278 max_form_parts=self.max_form_parts, 

279 ) 

280 

281 if self.cls is not None: 

282 kwargs["cls"] = self.cls 

283 

284 with MultiPartParser(**kwargs) as parser: 

285 form, files = parser.parse(stream, boundary, content_length) 

286 

287 return stream, form, files 

288 

289 def _parse_urlencoded( 

290 self, 

291 stream: t.IO[bytes], 

292 mimetype: str, 

293 content_length: int | None, 

294 options: dict[str, str], 

295 ) -> t_parse_result: 

296 if ( 

297 self.max_form_memory_size is not None 

298 and content_length is not None 

299 and content_length > self.max_form_memory_size 

300 ): 

301 raise RequestEntityTooLarge() 

302 

303 items = parse_qsl( 

304 stream.read().decode(), 

305 keep_blank_values=True, 

306 errors="werkzeug.url_quote", 

307 ) 

308 

309 if self.cls is not None: 

310 return stream, self.cls(items), self.cls() 

311 

312 return stream, ImmutableMultiDict(items), ImmutableMultiDict() 

313 

314 

315class MultiPartParser: 

316 def __init__( 

317 self, 

318 stream_factory: TStreamFactory | None = None, 

319 max_form_memory_size: int | None = None, 

320 buffer_size: int = 64 * 1024, 

321 max_form_parts: int | None = None, 

322 **kwargs: t.Any, 

323 ) -> None: 

324 self.max_form_memory_size = max_form_memory_size 

325 self.max_form_parts = max_form_parts 

326 

327 if stream_factory is None: 

328 stream_factory = default_stream_factory 

329 

330 self.stream_factory = stream_factory 

331 self._files: list[t.IO[bytes]] = [] 

332 

333 if "cls" in kwargs: 

334 import warnings 

335 

336 warnings.warn( 

337 "The 'cls' parameter is deprecated and will be removed in Werkzeug 3.3." 

338 " It will always be 'ImmutableMultiDict'.", 

339 DeprecationWarning, 

340 stacklevel=2, 

341 ) 

342 

343 self.cls: type[ImmutableMultiDict[str, t.Any]] | None = kwargs.get("cls") 

344 self.buffer_size = buffer_size 

345 

346 def __enter__(self) -> te.Self: 

347 return self 

348 

349 def __exit__( 

350 self, 

351 exc_type: type[BaseException] | None, 

352 exc_val: BaseException | None, 

353 exc_tb: TracebackType | None, 

354 ) -> None: 

355 if exc_val is not None: 

356 for file in self._files: 

357 file.close() 

358 

359 def get_part_charset(self, headers: Headers) -> str: 

360 # Figure out input charset for current part 

361 content_type = headers.get("Content-Type") 

362 

363 if content_type: 

364 parameters = parse_options_header(content_type)[1] 

365 ct_charset = parameters.get("charset", "").lower() 

366 

367 # A safe list of encodings. Modern clients should only send ASCII or UTF-8. 

368 # This list will not be extended further. 

369 if ct_charset in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}: 

370 return ct_charset 

371 

372 return "utf-8" 

373 

374 def start_file_streaming( 

375 self, event: File, total_content_length: int | None 

376 ) -> t.IO[bytes]: 

377 content_type = event.headers.get("Content-Type") 

378 

379 try: 

380 content_length = _plain_int(event.headers["Content-Length"]) 

381 except (KeyError, ValueError): 

382 content_length = 0 

383 

384 container = self.stream_factory( 

385 total_content_length=total_content_length, 

386 filename=event.filename, 

387 content_type=content_type, 

388 content_length=content_length, 

389 ) 

390 self._files.append(container) 

391 return container 

392 

393 def parse( 

394 self, stream: t.IO[bytes], boundary: bytes, content_length: int | None 

395 ) -> tuple[MultiDict[str, str], MultiDict[str, FileStorage]]: 

396 current_part: Field | File 

397 field_size: int | None = None 

398 container: t.IO[bytes] | list[bytes] 

399 _write: t.Callable[[bytes], t.Any] 

400 

401 parser = MultipartDecoder( 

402 boundary, 

403 max_form_memory_size=self.max_form_memory_size, 

404 max_parts=self.max_form_parts, 

405 ) 

406 

407 fields = [] 

408 files = [] 

409 

410 for data in _chunk_iter(stream.read, self.buffer_size): 

411 parser.receive_data(data) 

412 event = parser.next_event() 

413 while not isinstance(event, (Epilogue, NeedData)): 

414 if isinstance(event, Field): 

415 current_part = event 

416 field_size = 0 

417 container = [] 

418 _write = container.append 

419 elif isinstance(event, File): 

420 current_part = event 

421 field_size = None 

422 container = self.start_file_streaming(event, content_length) 

423 _write = container.write 

424 elif isinstance(event, Data): 

425 if self.max_form_memory_size is not None and field_size is not None: 

426 # Ensure that accumulated data events do not exceed limit. 

427 # Also checked within single event in MultipartDecoder. 

428 field_size += len(event.data) 

429 

430 if field_size > self.max_form_memory_size: 

431 raise RequestEntityTooLarge() 

432 

433 _write(event.data) 

434 if not event.more_data: 

435 if isinstance(current_part, Field): 

436 value = b"".join(container).decode( 

437 self.get_part_charset(current_part.headers), "replace" 

438 ) 

439 fields.append((current_part.name, value)) 

440 else: 

441 container = t.cast(t.IO[bytes], container) 

442 container.seek(0) 

443 files.append( 

444 ( 

445 current_part.name, 

446 FileStorage( 

447 container, 

448 current_part.filename, 

449 current_part.name, 

450 headers=current_part.headers, 

451 ), 

452 ) 

453 ) 

454 

455 event = parser.next_event() 

456 

457 if self.cls is not None: 

458 return self.cls(fields), self.cls(files) 

459 

460 return ImmutableMultiDict(fields), ImmutableMultiDict(files) 

461 

462 

463def _chunk_iter(read: t.Callable[[int], bytes], size: int) -> t.Iterator[bytes | None]: 

464 """Read data in chunks for multipart/form-data parsing. Stop if no data is read. 

465 Yield ``None`` at the end to signal end of parsing. 

466 """ 

467 while True: 

468 data = read(size) 

469 

470 if not data: 

471 break 

472 

473 yield data 

474 

475 yield None