Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/formparser.py: 28%

169 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1import typing as t 

2from functools import update_wrapper 

3from io import BytesIO 

4from itertools import chain 

5from typing import Union 

6 

7from . import exceptions 

8from .datastructures import FileStorage 

9from .datastructures import Headers 

10from .datastructures import MultiDict 

11from .http import parse_options_header 

12from .sansio.multipart import Data 

13from .sansio.multipart import Epilogue 

14from .sansio.multipart import Field 

15from .sansio.multipart import File 

16from .sansio.multipart import MultipartDecoder 

17from .sansio.multipart import NeedData 

18from .urls import url_decode_stream 

19from .wsgi import _make_chunk_iter 

20from .wsgi import get_content_length 

21from .wsgi import get_input_stream 

22 

23# there are some platforms where SpooledTemporaryFile is not available. 

24# In that case we need to provide a fallback. 

25try: 

26 from tempfile import SpooledTemporaryFile 

27except ImportError: 

28 from tempfile import TemporaryFile 

29 

30 SpooledTemporaryFile = None # type: ignore 

31 

32if t.TYPE_CHECKING: 

33 import typing as te 

34 from _typeshed.wsgi import WSGIEnvironment 

35 

36 t_parse_result = t.Tuple[t.IO[bytes], MultiDict, MultiDict] 

37 

38 class TStreamFactory(te.Protocol): 

39 def __call__( 

40 self, 

41 total_content_length: t.Optional[int], 

42 content_type: t.Optional[str], 

43 filename: t.Optional[str], 

44 content_length: t.Optional[int] = None, 

45 ) -> t.IO[bytes]: 

46 ... 

47 

48 

49F = t.TypeVar("F", bound=t.Callable[..., t.Any]) 

50 

51 

52def _exhaust(stream: t.IO[bytes]) -> None: 

53 bts = stream.read(64 * 1024) 

54 while bts: 

55 bts = stream.read(64 * 1024) 

56 

57 

58def default_stream_factory( 

59 total_content_length: t.Optional[int], 

60 content_type: t.Optional[str], 

61 filename: t.Optional[str], 

62 content_length: t.Optional[int] = None, 

63) -> t.IO[bytes]: 

64 max_size = 1024 * 500 

65 

66 if SpooledTemporaryFile is not None: 

67 return t.cast(t.IO[bytes], SpooledTemporaryFile(max_size=max_size, mode="rb+")) 

68 elif total_content_length is None or total_content_length > max_size: 

69 return t.cast(t.IO[bytes], TemporaryFile("rb+")) 

70 

71 return BytesIO() 

72 

73 

74def parse_form_data( 

75 environ: "WSGIEnvironment", 

76 stream_factory: t.Optional["TStreamFactory"] = None, 

77 charset: str = "utf-8", 

78 errors: str = "replace", 

79 max_form_memory_size: t.Optional[int] = None, 

80 max_content_length: t.Optional[int] = None, 

81 cls: t.Optional[t.Type[MultiDict]] = None, 

82 silent: bool = True, 

83) -> "t_parse_result": 

84 """Parse the form data in the environ and return it as tuple in the form 

85 ``(stream, form, files)``. You should only call this method if the 

86 transport method is `POST`, `PUT`, or `PATCH`. 

87 

88 If the mimetype of the data transmitted is `multipart/form-data` the 

89 files multidict will be filled with `FileStorage` objects. If the 

90 mimetype is unknown the input stream is wrapped and returned as first 

91 argument, else the stream is empty. 

92 

93 This is a shortcut for the common usage of :class:`FormDataParser`. 

94 

95 Have a look at :doc:`/request_data` for more details. 

96 

97 .. versionadded:: 0.5 

98 The `max_form_memory_size`, `max_content_length` and 

99 `cls` parameters were added. 

100 

101 .. versionadded:: 0.5.1 

102 The optional `silent` flag was added. 

103 

104 :param environ: the WSGI environment to be used for parsing. 

105 :param stream_factory: An optional callable that returns a new read and 

106 writeable file descriptor. This callable works 

107 the same as :meth:`Response._get_file_stream`. 

108 :param charset: The character set for URL and url encoded form data. 

109 :param errors: The encoding error behavior. 

110 :param max_form_memory_size: the maximum number of bytes to be accepted for 

111 in-memory stored form data. If the data 

112 exceeds the value specified an 

113 :exc:`~exceptions.RequestEntityTooLarge` 

114 exception is raised. 

115 :param max_content_length: If this is provided and the transmitted data 

116 is longer than this value an 

117 :exc:`~exceptions.RequestEntityTooLarge` 

118 exception is raised. 

119 :param cls: an optional dict class to use. If this is not specified 

120 or `None` the default :class:`MultiDict` is used. 

121 :param silent: If set to False parsing errors will not be caught. 

122 :return: A tuple in the form ``(stream, form, files)``. 

123 """ 

124 return FormDataParser( 

125 stream_factory, 

126 charset, 

127 errors, 

128 max_form_memory_size, 

129 max_content_length, 

130 cls, 

131 silent, 

132 ).parse_from_environ(environ) 

133 

134 

135def exhaust_stream(f: F) -> F: 

136 """Helper decorator for methods that exhausts the stream on return.""" 

137 

138 def wrapper(self, stream, *args, **kwargs): # type: ignore 

139 try: 

140 return f(self, stream, *args, **kwargs) 

141 finally: 

142 exhaust = getattr(stream, "exhaust", None) 

143 

144 if exhaust is not None: 

145 exhaust() 

146 else: 

147 while True: 

148 chunk = stream.read(1024 * 64) 

149 

150 if not chunk: 

151 break 

152 

153 return update_wrapper(t.cast(F, wrapper), f) 

154 

155 

156class FormDataParser: 

157 """This class implements parsing of form data for Werkzeug. By itself 

158 it can parse multipart and url encoded form data. It can be subclassed 

159 and extended but for most mimetypes it is a better idea to use the 

160 untouched stream and expose it as separate attributes on a request 

161 object. 

162 

163 .. versionadded:: 0.8 

164 

165 :param stream_factory: An optional callable that returns a new read and 

166 writeable file descriptor. This callable works 

167 the same as :meth:`Response._get_file_stream`. 

168 :param charset: The character set for URL and url encoded form data. 

169 :param errors: The encoding error behavior. 

170 :param max_form_memory_size: the maximum number of bytes to be accepted for 

171 in-memory stored form data. If the data 

172 exceeds the value specified an 

173 :exc:`~exceptions.RequestEntityTooLarge` 

174 exception is raised. 

175 :param max_content_length: If this is provided and the transmitted data 

176 is longer than this value an 

177 :exc:`~exceptions.RequestEntityTooLarge` 

178 exception is raised. 

179 :param cls: an optional dict class to use. If this is not specified 

180 or `None` the default :class:`MultiDict` is used. 

181 :param silent: If set to False parsing errors will not be caught. 

182 """ 

183 

184 def __init__( 

185 self, 

186 stream_factory: t.Optional["TStreamFactory"] = None, 

187 charset: str = "utf-8", 

188 errors: str = "replace", 

189 max_form_memory_size: t.Optional[int] = None, 

190 max_content_length: t.Optional[int] = None, 

191 cls: t.Optional[t.Type[MultiDict]] = None, 

192 silent: bool = True, 

193 ) -> None: 

194 if stream_factory is None: 

195 stream_factory = default_stream_factory 

196 

197 self.stream_factory = stream_factory 

198 self.charset = charset 

199 self.errors = errors 

200 self.max_form_memory_size = max_form_memory_size 

201 self.max_content_length = max_content_length 

202 

203 if cls is None: 

204 cls = MultiDict 

205 

206 self.cls = cls 

207 self.silent = silent 

208 

209 def get_parse_func( 

210 self, mimetype: str, options: t.Dict[str, str] 

211 ) -> t.Optional[ 

212 t.Callable[ 

213 ["FormDataParser", t.IO[bytes], str, t.Optional[int], t.Dict[str, str]], 

214 "t_parse_result", 

215 ] 

216 ]: 

217 return self.parse_functions.get(mimetype) 

218 

219 def parse_from_environ(self, environ: "WSGIEnvironment") -> "t_parse_result": 

220 """Parses the information from the environment as form data. 

221 

222 :param environ: the WSGI environment to be used for parsing. 

223 :return: A tuple in the form ``(stream, form, files)``. 

224 """ 

225 content_type = environ.get("CONTENT_TYPE", "") 

226 content_length = get_content_length(environ) 

227 mimetype, options = parse_options_header(content_type) 

228 return self.parse(get_input_stream(environ), mimetype, content_length, options) 

229 

230 def parse( 

231 self, 

232 stream: t.IO[bytes], 

233 mimetype: str, 

234 content_length: t.Optional[int], 

235 options: t.Optional[t.Dict[str, str]] = None, 

236 ) -> "t_parse_result": 

237 """Parses the information from the given stream, mimetype, 

238 content length and mimetype parameters. 

239 

240 :param stream: an input stream 

241 :param mimetype: the mimetype of the data 

242 :param content_length: the content length of the incoming data 

243 :param options: optional mimetype parameters (used for 

244 the multipart boundary for instance) 

245 :return: A tuple in the form ``(stream, form, files)``. 

246 """ 

247 if ( 

248 self.max_content_length is not None 

249 and content_length is not None 

250 and content_length > self.max_content_length 

251 ): 

252 # if the input stream is not exhausted, firefox reports Connection Reset 

253 _exhaust(stream) 

254 raise exceptions.RequestEntityTooLarge() 

255 

256 if options is None: 

257 options = {} 

258 

259 parse_func = self.get_parse_func(mimetype, options) 

260 

261 if parse_func is not None: 

262 try: 

263 return parse_func(self, stream, mimetype, content_length, options) 

264 except ValueError: 

265 if not self.silent: 

266 raise 

267 

268 return stream, self.cls(), self.cls() 

269 

270 @exhaust_stream 

271 def _parse_multipart( 

272 self, 

273 stream: t.IO[bytes], 

274 mimetype: str, 

275 content_length: t.Optional[int], 

276 options: t.Dict[str, str], 

277 ) -> "t_parse_result": 

278 parser = MultiPartParser( 

279 self.stream_factory, 

280 self.charset, 

281 self.errors, 

282 max_form_memory_size=self.max_form_memory_size, 

283 cls=self.cls, 

284 ) 

285 boundary = options.get("boundary", "").encode("ascii") 

286 

287 if not boundary: 

288 raise ValueError("Missing boundary") 

289 

290 form, files = parser.parse(stream, boundary, content_length) 

291 return stream, form, files 

292 

293 @exhaust_stream 

294 def _parse_urlencoded( 

295 self, 

296 stream: t.IO[bytes], 

297 mimetype: str, 

298 content_length: t.Optional[int], 

299 options: t.Dict[str, str], 

300 ) -> "t_parse_result": 

301 if ( 

302 self.max_form_memory_size is not None 

303 and content_length is not None 

304 and content_length > self.max_form_memory_size 

305 ): 

306 # if the input stream is not exhausted, firefox reports Connection Reset 

307 _exhaust(stream) 

308 raise exceptions.RequestEntityTooLarge() 

309 

310 form = url_decode_stream(stream, self.charset, errors=self.errors, cls=self.cls) 

311 return stream, form, self.cls() 

312 

313 #: mapping of mimetypes to parsing functions 

314 parse_functions: t.Dict[ 

315 str, 

316 t.Callable[ 

317 ["FormDataParser", t.IO[bytes], str, t.Optional[int], t.Dict[str, str]], 

318 "t_parse_result", 

319 ], 

320 ] = { 

321 "multipart/form-data": _parse_multipart, 

322 "application/x-www-form-urlencoded": _parse_urlencoded, 

323 "application/x-url-encoded": _parse_urlencoded, 

324 } 

325 

326 

327def _line_parse(line: str) -> t.Tuple[str, bool]: 

328 """Removes line ending characters and returns a tuple (`stripped_line`, 

329 `is_terminated`). 

330 """ 

331 if line[-2:] == "\r\n": 

332 return line[:-2], True 

333 

334 elif line[-1:] in {"\r", "\n"}: 

335 return line[:-1], True 

336 

337 return line, False 

338 

339 

340class MultiPartParser: 

341 def __init__( 

342 self, 

343 stream_factory: t.Optional["TStreamFactory"] = None, 

344 charset: str = "utf-8", 

345 errors: str = "replace", 

346 max_form_memory_size: t.Optional[int] = None, 

347 cls: t.Optional[t.Type[MultiDict]] = None, 

348 buffer_size: int = 64 * 1024, 

349 ) -> None: 

350 self.charset = charset 

351 self.errors = errors 

352 self.max_form_memory_size = max_form_memory_size 

353 

354 if stream_factory is None: 

355 stream_factory = default_stream_factory 

356 

357 self.stream_factory = stream_factory 

358 

359 if cls is None: 

360 cls = MultiDict 

361 

362 self.cls = cls 

363 

364 self.buffer_size = buffer_size 

365 

366 def fail(self, message: str) -> "te.NoReturn": 

367 raise ValueError(message) 

368 

369 def get_part_charset(self, headers: Headers) -> str: 

370 # Figure out input charset for current part 

371 content_type = headers.get("content-type") 

372 

373 if content_type: 

374 mimetype, ct_params = parse_options_header(content_type) 

375 return ct_params.get("charset", self.charset) 

376 

377 return self.charset 

378 

379 def start_file_streaming( 

380 self, event: File, total_content_length: t.Optional[int] 

381 ) -> t.IO[bytes]: 

382 content_type = event.headers.get("content-type") 

383 

384 try: 

385 content_length = int(event.headers["content-length"]) 

386 except (KeyError, ValueError): 

387 content_length = 0 

388 

389 container = self.stream_factory( 

390 total_content_length=total_content_length, 

391 filename=event.filename, 

392 content_type=content_type, 

393 content_length=content_length, 

394 ) 

395 return container 

396 

397 def parse( 

398 self, stream: t.IO[bytes], boundary: bytes, content_length: t.Optional[int] 

399 ) -> t.Tuple[MultiDict, MultiDict]: 

400 container: t.Union[t.IO[bytes], t.List[bytes]] 

401 _write: t.Callable[[bytes], t.Any] 

402 

403 iterator = chain( 

404 _make_chunk_iter( 

405 stream, 

406 limit=content_length, 

407 buffer_size=self.buffer_size, 

408 ), 

409 [None], 

410 ) 

411 

412 parser = MultipartDecoder(boundary, self.max_form_memory_size) 

413 

414 fields = [] 

415 files = [] 

416 

417 current_part: Union[Field, File] 

418 for data in iterator: 

419 parser.receive_data(data) 

420 event = parser.next_event() 

421 while not isinstance(event, (Epilogue, NeedData)): 

422 if isinstance(event, Field): 

423 current_part = event 

424 container = [] 

425 _write = container.append 

426 elif isinstance(event, File): 

427 current_part = event 

428 container = self.start_file_streaming(event, content_length) 

429 _write = container.write 

430 elif isinstance(event, Data): 

431 _write(event.data) 

432 if not event.more_data: 

433 if isinstance(current_part, Field): 

434 value = b"".join(container).decode( 

435 self.get_part_charset(current_part.headers), self.errors 

436 ) 

437 fields.append((current_part.name, value)) 

438 else: 

439 container = t.cast(t.IO[bytes], container) 

440 container.seek(0) 

441 files.append( 

442 ( 

443 current_part.name, 

444 FileStorage( 

445 container, 

446 current_part.filename, 

447 current_part.name, 

448 headers=current_part.headers, 

449 ), 

450 ) 

451 ) 

452 

453 event = parser.next_event() 

454 

455 return self.cls(fields), self.cls(files)