Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/formparser.py: 27%

171 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1import typing as t 

2from functools import update_wrapper 

3from io import BytesIO 

4from itertools import chain 

5from typing import Union 

6 

7from . import exceptions 

8from .datastructures import FileStorage 

9from .datastructures import Headers 

10from .datastructures import MultiDict 

11from .http import parse_options_header 

12from .sansio.multipart import Data 

13from .sansio.multipart import Epilogue 

14from .sansio.multipart import Field 

15from .sansio.multipart import File 

16from .sansio.multipart import MultipartDecoder 

17from .sansio.multipart import NeedData 

18from .urls import url_decode_stream 

19from .wsgi import _make_chunk_iter 

20from .wsgi import get_content_length 

21from .wsgi import get_input_stream 

22 

23# there are some platforms where SpooledTemporaryFile is not available. 

24# In that case we need to provide a fallback. 

25try: 

26 from tempfile import SpooledTemporaryFile 

27except ImportError: 

28 from tempfile import TemporaryFile 

29 

30 SpooledTemporaryFile = None # type: ignore 

31 

32if t.TYPE_CHECKING: 

33 import typing as te 

34 from _typeshed.wsgi import WSGIEnvironment 

35 

36 t_parse_result = t.Tuple[t.IO[bytes], MultiDict, MultiDict] 

37 

38 class TStreamFactory(te.Protocol): 

39 def __call__( 

40 self, 

41 total_content_length: t.Optional[int], 

42 content_type: t.Optional[str], 

43 filename: t.Optional[str], 

44 content_length: t.Optional[int] = None, 

45 ) -> t.IO[bytes]: 

46 ... 

47 

48 

49F = t.TypeVar("F", bound=t.Callable[..., t.Any]) 

50 

51 

52def _exhaust(stream: t.IO[bytes]) -> None: 

53 bts = stream.read(64 * 1024) 

54 while bts: 

55 bts = stream.read(64 * 1024) 

56 

57 

58def default_stream_factory( 

59 total_content_length: t.Optional[int], 

60 content_type: t.Optional[str], 

61 filename: t.Optional[str], 

62 content_length: t.Optional[int] = None, 

63) -> t.IO[bytes]: 

64 max_size = 1024 * 500 

65 

66 if SpooledTemporaryFile is not None: 

67 return t.cast(t.IO[bytes], SpooledTemporaryFile(max_size=max_size, mode="rb+")) 

68 elif total_content_length is None or total_content_length > max_size: 

69 return t.cast(t.IO[bytes], TemporaryFile("rb+")) 

70 

71 return BytesIO() 

72 

73 

74def parse_form_data( 

75 environ: "WSGIEnvironment", 

76 stream_factory: t.Optional["TStreamFactory"] = None, 

77 charset: str = "utf-8", 

78 errors: str = "replace", 

79 max_form_memory_size: t.Optional[int] = None, 

80 max_content_length: t.Optional[int] = None, 

81 cls: t.Optional[t.Type[MultiDict]] = None, 

82 silent: bool = True, 

83) -> "t_parse_result": 

84 """Parse the form data in the environ and return it as tuple in the form 

85 ``(stream, form, files)``. You should only call this method if the 

86 transport method is `POST`, `PUT`, or `PATCH`. 

87 

88 If the mimetype of the data transmitted is `multipart/form-data` the 

89 files multidict will be filled with `FileStorage` objects. If the 

90 mimetype is unknown the input stream is wrapped and returned as first 

91 argument, else the stream is empty. 

92 

93 This is a shortcut for the common usage of :class:`FormDataParser`. 

94 

95 Have a look at :doc:`/request_data` for more details. 

96 

97 .. versionadded:: 0.5 

98 The `max_form_memory_size`, `max_content_length` and 

99 `cls` parameters were added. 

100 

101 .. versionadded:: 0.5.1 

102 The optional `silent` flag was added. 

103 

104 :param environ: the WSGI environment to be used for parsing. 

105 :param stream_factory: An optional callable that returns a new read and 

106 writeable file descriptor. This callable works 

107 the same as :meth:`Response._get_file_stream`. 

108 :param charset: The character set for URL and url encoded form data. 

109 :param errors: The encoding error behavior. 

110 :param max_form_memory_size: the maximum number of bytes to be accepted for 

111 in-memory stored form data. If the data 

112 exceeds the value specified an 

113 :exc:`~exceptions.RequestEntityTooLarge` 

114 exception is raised. 

115 :param max_content_length: If this is provided and the transmitted data 

116 is longer than this value an 

117 :exc:`~exceptions.RequestEntityTooLarge` 

118 exception is raised. 

119 :param cls: an optional dict class to use. If this is not specified 

120 or `None` the default :class:`MultiDict` is used. 

121 :param silent: If set to False parsing errors will not be caught. 

122 :return: A tuple in the form ``(stream, form, files)``. 

123 """ 

124 return FormDataParser( 

125 stream_factory, 

126 charset, 

127 errors, 

128 max_form_memory_size, 

129 max_content_length, 

130 cls, 

131 silent, 

132 ).parse_from_environ(environ) 

133 

134 

135def exhaust_stream(f: F) -> F: 

136 """Helper decorator for methods that exhausts the stream on return.""" 

137 

138 def wrapper(self, stream, *args, **kwargs): # type: ignore 

139 try: 

140 return f(self, stream, *args, **kwargs) 

141 finally: 

142 exhaust = getattr(stream, "exhaust", None) 

143 

144 if exhaust is not None: 

145 exhaust() 

146 else: 

147 while True: 

148 chunk = stream.read(1024 * 64) 

149 

150 if not chunk: 

151 break 

152 

153 return update_wrapper(t.cast(F, wrapper), f) 

154 

155 

156class FormDataParser: 

157 """This class implements parsing of form data for Werkzeug. By itself 

158 it can parse multipart and url encoded form data. It can be subclassed 

159 and extended but for most mimetypes it is a better idea to use the 

160 untouched stream and expose it as separate attributes on a request 

161 object. 

162 

163 .. versionadded:: 0.8 

164 

165 :param stream_factory: An optional callable that returns a new read and 

166 writeable file descriptor. This callable works 

167 the same as :meth:`Response._get_file_stream`. 

168 :param charset: The character set for URL and url encoded form data. 

169 :param errors: The encoding error behavior. 

170 :param max_form_memory_size: the maximum number of bytes to be accepted for 

171 in-memory stored form data. If the data 

172 exceeds the value specified an 

173 :exc:`~exceptions.RequestEntityTooLarge` 

174 exception is raised. 

175 :param max_content_length: If this is provided and the transmitted data 

176 is longer than this value an 

177 :exc:`~exceptions.RequestEntityTooLarge` 

178 exception is raised. 

179 :param cls: an optional dict class to use. If this is not specified 

180 or `None` the default :class:`MultiDict` is used. 

181 :param silent: If set to False parsing errors will not be caught. 

182 :param max_form_parts: The maximum number of parts to be parsed. If this is 

183 exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised. 

184 """ 

185 

186 def __init__( 

187 self, 

188 stream_factory: t.Optional["TStreamFactory"] = None, 

189 charset: str = "utf-8", 

190 errors: str = "replace", 

191 max_form_memory_size: t.Optional[int] = None, 

192 max_content_length: t.Optional[int] = None, 

193 cls: t.Optional[t.Type[MultiDict]] = None, 

194 silent: bool = True, 

195 *, 

196 max_form_parts: t.Optional[int] = None, 

197 ) -> None: 

198 if stream_factory is None: 

199 stream_factory = default_stream_factory 

200 

201 self.stream_factory = stream_factory 

202 self.charset = charset 

203 self.errors = errors 

204 self.max_form_memory_size = max_form_memory_size 

205 self.max_content_length = max_content_length 

206 self.max_form_parts = max_form_parts 

207 

208 if cls is None: 

209 cls = MultiDict 

210 

211 self.cls = cls 

212 self.silent = silent 

213 

214 def get_parse_func( 

215 self, mimetype: str, options: t.Dict[str, str] 

216 ) -> t.Optional[ 

217 t.Callable[ 

218 ["FormDataParser", t.IO[bytes], str, t.Optional[int], t.Dict[str, str]], 

219 "t_parse_result", 

220 ] 

221 ]: 

222 return self.parse_functions.get(mimetype) 

223 

224 def parse_from_environ(self, environ: "WSGIEnvironment") -> "t_parse_result": 

225 """Parses the information from the environment as form data. 

226 

227 :param environ: the WSGI environment to be used for parsing. 

228 :return: A tuple in the form ``(stream, form, files)``. 

229 """ 

230 content_type = environ.get("CONTENT_TYPE", "") 

231 content_length = get_content_length(environ) 

232 mimetype, options = parse_options_header(content_type) 

233 return self.parse(get_input_stream(environ), mimetype, content_length, options) 

234 

235 def parse( 

236 self, 

237 stream: t.IO[bytes], 

238 mimetype: str, 

239 content_length: t.Optional[int], 

240 options: t.Optional[t.Dict[str, str]] = None, 

241 ) -> "t_parse_result": 

242 """Parses the information from the given stream, mimetype, 

243 content length and mimetype parameters. 

244 

245 :param stream: an input stream 

246 :param mimetype: the mimetype of the data 

247 :param content_length: the content length of the incoming data 

248 :param options: optional mimetype parameters (used for 

249 the multipart boundary for instance) 

250 :return: A tuple in the form ``(stream, form, files)``. 

251 """ 

252 if ( 

253 self.max_content_length is not None 

254 and content_length is not None 

255 and content_length > self.max_content_length 

256 ): 

257 # if the input stream is not exhausted, firefox reports Connection Reset 

258 _exhaust(stream) 

259 raise exceptions.RequestEntityTooLarge() 

260 

261 if options is None: 

262 options = {} 

263 

264 parse_func = self.get_parse_func(mimetype, options) 

265 

266 if parse_func is not None: 

267 try: 

268 return parse_func(self, stream, mimetype, content_length, options) 

269 except ValueError: 

270 if not self.silent: 

271 raise 

272 

273 return stream, self.cls(), self.cls() 

274 

275 @exhaust_stream 

276 def _parse_multipart( 

277 self, 

278 stream: t.IO[bytes], 

279 mimetype: str, 

280 content_length: t.Optional[int], 

281 options: t.Dict[str, str], 

282 ) -> "t_parse_result": 

283 parser = MultiPartParser( 

284 self.stream_factory, 

285 self.charset, 

286 self.errors, 

287 max_form_memory_size=self.max_form_memory_size, 

288 cls=self.cls, 

289 max_form_parts=self.max_form_parts, 

290 ) 

291 boundary = options.get("boundary", "").encode("ascii") 

292 

293 if not boundary: 

294 raise ValueError("Missing boundary") 

295 

296 form, files = parser.parse(stream, boundary, content_length) 

297 return stream, form, files 

298 

299 @exhaust_stream 

300 def _parse_urlencoded( 

301 self, 

302 stream: t.IO[bytes], 

303 mimetype: str, 

304 content_length: t.Optional[int], 

305 options: t.Dict[str, str], 

306 ) -> "t_parse_result": 

307 if ( 

308 self.max_form_memory_size is not None 

309 and content_length is not None 

310 and content_length > self.max_form_memory_size 

311 ): 

312 # if the input stream is not exhausted, firefox reports Connection Reset 

313 _exhaust(stream) 

314 raise exceptions.RequestEntityTooLarge() 

315 

316 form = url_decode_stream(stream, self.charset, errors=self.errors, cls=self.cls) 

317 return stream, form, self.cls() 

318 

319 #: mapping of mimetypes to parsing functions 

320 parse_functions: t.Dict[ 

321 str, 

322 t.Callable[ 

323 ["FormDataParser", t.IO[bytes], str, t.Optional[int], t.Dict[str, str]], 

324 "t_parse_result", 

325 ], 

326 ] = { 

327 "multipart/form-data": _parse_multipart, 

328 "application/x-www-form-urlencoded": _parse_urlencoded, 

329 "application/x-url-encoded": _parse_urlencoded, 

330 } 

331 

332 

333def _line_parse(line: str) -> t.Tuple[str, bool]: 

334 """Removes line ending characters and returns a tuple (`stripped_line`, 

335 `is_terminated`). 

336 """ 

337 if line[-2:] == "\r\n": 

338 return line[:-2], True 

339 

340 elif line[-1:] in {"\r", "\n"}: 

341 return line[:-1], True 

342 

343 return line, False 

344 

345 

346class MultiPartParser: 

347 def __init__( 

348 self, 

349 stream_factory: t.Optional["TStreamFactory"] = None, 

350 charset: str = "utf-8", 

351 errors: str = "replace", 

352 max_form_memory_size: t.Optional[int] = None, 

353 cls: t.Optional[t.Type[MultiDict]] = None, 

354 buffer_size: int = 64 * 1024, 

355 max_form_parts: t.Optional[int] = None, 

356 ) -> None: 

357 self.charset = charset 

358 self.errors = errors 

359 self.max_form_memory_size = max_form_memory_size 

360 self.max_form_parts = max_form_parts 

361 

362 if stream_factory is None: 

363 stream_factory = default_stream_factory 

364 

365 self.stream_factory = stream_factory 

366 

367 if cls is None: 

368 cls = MultiDict 

369 

370 self.cls = cls 

371 

372 self.buffer_size = buffer_size 

373 

374 def fail(self, message: str) -> "te.NoReturn": 

375 raise ValueError(message) 

376 

377 def get_part_charset(self, headers: Headers) -> str: 

378 # Figure out input charset for current part 

379 content_type = headers.get("content-type") 

380 

381 if content_type: 

382 mimetype, ct_params = parse_options_header(content_type) 

383 return ct_params.get("charset", self.charset) 

384 

385 return self.charset 

386 

387 def start_file_streaming( 

388 self, event: File, total_content_length: t.Optional[int] 

389 ) -> t.IO[bytes]: 

390 content_type = event.headers.get("content-type") 

391 

392 try: 

393 content_length = int(event.headers["content-length"]) 

394 except (KeyError, ValueError): 

395 content_length = 0 

396 

397 container = self.stream_factory( 

398 total_content_length=total_content_length, 

399 filename=event.filename, 

400 content_type=content_type, 

401 content_length=content_length, 

402 ) 

403 return container 

404 

405 def parse( 

406 self, stream: t.IO[bytes], boundary: bytes, content_length: t.Optional[int] 

407 ) -> t.Tuple[MultiDict, MultiDict]: 

408 container: t.Union[t.IO[bytes], t.List[bytes]] 

409 _write: t.Callable[[bytes], t.Any] 

410 

411 iterator = chain( 

412 _make_chunk_iter( 

413 stream, 

414 limit=content_length, 

415 buffer_size=self.buffer_size, 

416 ), 

417 [None], 

418 ) 

419 

420 parser = MultipartDecoder( 

421 boundary, self.max_form_memory_size, max_parts=self.max_form_parts 

422 ) 

423 

424 fields = [] 

425 files = [] 

426 

427 current_part: Union[Field, File] 

428 for data in iterator: 

429 parser.receive_data(data) 

430 event = parser.next_event() 

431 while not isinstance(event, (Epilogue, NeedData)): 

432 if isinstance(event, Field): 

433 current_part = event 

434 container = [] 

435 _write = container.append 

436 elif isinstance(event, File): 

437 current_part = event 

438 container = self.start_file_streaming(event, content_length) 

439 _write = container.write 

440 elif isinstance(event, Data): 

441 _write(event.data) 

442 if not event.more_data: 

443 if isinstance(current_part, Field): 

444 value = b"".join(container).decode( 

445 self.get_part_charset(current_part.headers), self.errors 

446 ) 

447 fields.append((current_part.name, value)) 

448 else: 

449 container = t.cast(t.IO[bytes], container) 

450 container.seek(0) 

451 files.append( 

452 ( 

453 current_part.name, 

454 FileStorage( 

455 container, 

456 current_part.filename, 

457 current_part.name, 

458 headers=current_part.headers, 

459 ), 

460 ) 

461 ) 

462 

463 event = parser.next_event() 

464 

465 return self.cls(fields), self.cls(files)