Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/formparser.py: 25%

148 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-09 07:17 +0000

1from __future__ import annotations 

2 

3import typing as t 

4from io import BytesIO 

5from urllib.parse import parse_qsl 

6 

7from ._internal import _plain_int 

8from .datastructures import FileStorage 

9from .datastructures import Headers 

10from .datastructures import MultiDict 

11from .exceptions import RequestEntityTooLarge 

12from .http import parse_options_header 

13from .sansio.multipart import Data 

14from .sansio.multipart import Epilogue 

15from .sansio.multipart import Field 

16from .sansio.multipart import File 

17from .sansio.multipart import MultipartDecoder 

18from .sansio.multipart import NeedData 

19from .wsgi import get_content_length 

20from .wsgi import get_input_stream 

21 

22# there are some platforms where SpooledTemporaryFile is not available. 

23# In that case we need to provide a fallback. 

24try: 

25 from tempfile import SpooledTemporaryFile 

26except ImportError: 

27 from tempfile import TemporaryFile 

28 

29 SpooledTemporaryFile = None # type: ignore 

30 

31if t.TYPE_CHECKING: 

32 import typing as te 

33 from _typeshed.wsgi import WSGIEnvironment 

34 

35 t_parse_result = t.Tuple[t.IO[bytes], MultiDict, MultiDict] 

36 

37 class TStreamFactory(te.Protocol): 

38 def __call__( 

39 self, 

40 total_content_length: int | None, 

41 content_type: str | None, 

42 filename: str | None, 

43 content_length: int | None = None, 

44 ) -> t.IO[bytes]: 

45 ... 

46 

47 

48F = t.TypeVar("F", bound=t.Callable[..., t.Any]) 

49 

50 

51def default_stream_factory( 

52 total_content_length: int | None, 

53 content_type: str | None, 

54 filename: str | None, 

55 content_length: int | None = None, 

56) -> t.IO[bytes]: 

57 max_size = 1024 * 500 

58 

59 if SpooledTemporaryFile is not None: 

60 return t.cast(t.IO[bytes], SpooledTemporaryFile(max_size=max_size, mode="rb+")) 

61 elif total_content_length is None or total_content_length > max_size: 

62 return t.cast(t.IO[bytes], TemporaryFile("rb+")) 

63 

64 return BytesIO() 

65 

66 

67def parse_form_data( 

68 environ: WSGIEnvironment, 

69 stream_factory: TStreamFactory | None = None, 

70 max_form_memory_size: int | None = None, 

71 max_content_length: int | None = None, 

72 cls: type[MultiDict] | None = None, 

73 silent: bool = True, 

74 *, 

75 max_form_parts: int | None = None, 

76) -> t_parse_result: 

77 """Parse the form data in the environ and return it as tuple in the form 

78 ``(stream, form, files)``. You should only call this method if the 

79 transport method is `POST`, `PUT`, or `PATCH`. 

80 

81 If the mimetype of the data transmitted is `multipart/form-data` the 

82 files multidict will be filled with `FileStorage` objects. If the 

83 mimetype is unknown the input stream is wrapped and returned as first 

84 argument, else the stream is empty. 

85 

86 This is a shortcut for the common usage of :class:`FormDataParser`. 

87 

88 :param environ: the WSGI environment to be used for parsing. 

89 :param stream_factory: An optional callable that returns a new read and 

90 writeable file descriptor. This callable works 

91 the same as :meth:`Response._get_file_stream`. 

92 :param max_form_memory_size: the maximum number of bytes to be accepted for 

93 in-memory stored form data. If the data 

94 exceeds the value specified an 

95 :exc:`~exceptions.RequestEntityTooLarge` 

96 exception is raised. 

97 :param max_content_length: If this is provided and the transmitted data 

98 is longer than this value an 

99 :exc:`~exceptions.RequestEntityTooLarge` 

100 exception is raised. 

101 :param cls: an optional dict class to use. If this is not specified 

102 or `None` the default :class:`MultiDict` is used. 

103 :param silent: If set to False parsing errors will not be caught. 

104 :param max_form_parts: The maximum number of multipart parts to be parsed. If this 

105 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised. 

106 :return: A tuple in the form ``(stream, form, files)``. 

107 

108 .. versionchanged:: 3.0 

109 The ``charset`` and ``errors`` parameters were removed. 

110 

111 .. versionchanged:: 2.3 

112 Added the ``max_form_parts`` parameter. 

113 

114 .. versionadded:: 0.5.1 

115 Added the ``silent`` parameter. 

116 

117 .. versionadded:: 0.5 

118 Added the ``max_form_memory_size``, ``max_content_length``, and ``cls`` 

119 parameters. 

120 """ 

121 return FormDataParser( 

122 stream_factory=stream_factory, 

123 max_form_memory_size=max_form_memory_size, 

124 max_content_length=max_content_length, 

125 max_form_parts=max_form_parts, 

126 silent=silent, 

127 cls=cls, 

128 ).parse_from_environ(environ) 

129 

130 

131class FormDataParser: 

132 """This class implements parsing of form data for Werkzeug. By itself 

133 it can parse multipart and url encoded form data. It can be subclassed 

134 and extended but for most mimetypes it is a better idea to use the 

135 untouched stream and expose it as separate attributes on a request 

136 object. 

137 

138 :param stream_factory: An optional callable that returns a new read and 

139 writeable file descriptor. This callable works 

140 the same as :meth:`Response._get_file_stream`. 

141 :param max_form_memory_size: the maximum number of bytes to be accepted for 

142 in-memory stored form data. If the data 

143 exceeds the value specified an 

144 :exc:`~exceptions.RequestEntityTooLarge` 

145 exception is raised. 

146 :param max_content_length: If this is provided and the transmitted data 

147 is longer than this value an 

148 :exc:`~exceptions.RequestEntityTooLarge` 

149 exception is raised. 

150 :param cls: an optional dict class to use. If this is not specified 

151 or `None` the default :class:`MultiDict` is used. 

152 :param silent: If set to False parsing errors will not be caught. 

153 :param max_form_parts: The maximum number of multipart parts to be parsed. If this 

154 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised. 

155 

156 .. versionchanged:: 3.0 

157 The ``charset`` and ``errors`` parameters were removed. 

158 

159 .. versionchanged:: 3.0 

160 The ``parse_functions`` attribute and ``get_parse_func`` methods were removed. 

161 

162 .. versionchanged:: 2.2.3 

163 Added the ``max_form_parts`` parameter. 

164 

165 .. versionadded:: 0.8 

166 """ 

167 

168 def __init__( 

169 self, 

170 stream_factory: TStreamFactory | None = None, 

171 max_form_memory_size: int | None = None, 

172 max_content_length: int | None = None, 

173 cls: type[MultiDict] | None = None, 

174 silent: bool = True, 

175 *, 

176 max_form_parts: int | None = None, 

177 ) -> None: 

178 if stream_factory is None: 

179 stream_factory = default_stream_factory 

180 

181 self.stream_factory = stream_factory 

182 self.max_form_memory_size = max_form_memory_size 

183 self.max_content_length = max_content_length 

184 self.max_form_parts = max_form_parts 

185 

186 if cls is None: 

187 cls = MultiDict 

188 

189 self.cls = cls 

190 self.silent = silent 

191 

192 def parse_from_environ(self, environ: WSGIEnvironment) -> t_parse_result: 

193 """Parses the information from the environment as form data. 

194 

195 :param environ: the WSGI environment to be used for parsing. 

196 :return: A tuple in the form ``(stream, form, files)``. 

197 """ 

198 stream = get_input_stream(environ, max_content_length=self.max_content_length) 

199 content_length = get_content_length(environ) 

200 mimetype, options = parse_options_header(environ.get("CONTENT_TYPE")) 

201 return self.parse( 

202 stream, 

203 content_length=content_length, 

204 mimetype=mimetype, 

205 options=options, 

206 ) 

207 

208 def parse( 

209 self, 

210 stream: t.IO[bytes], 

211 mimetype: str, 

212 content_length: int | None, 

213 options: dict[str, str] | None = None, 

214 ) -> t_parse_result: 

215 """Parses the information from the given stream, mimetype, 

216 content length and mimetype parameters. 

217 

218 :param stream: an input stream 

219 :param mimetype: the mimetype of the data 

220 :param content_length: the content length of the incoming data 

221 :param options: optional mimetype parameters (used for 

222 the multipart boundary for instance) 

223 :return: A tuple in the form ``(stream, form, files)``. 

224 

225 .. versionchanged:: 3.0 

226 The invalid ``application/x-url-encoded`` content type is not 

227 treated as ``application/x-www-form-urlencoded``. 

228 """ 

229 if mimetype == "multipart/form-data": 

230 parse_func = self._parse_multipart 

231 elif mimetype == "application/x-www-form-urlencoded": 

232 parse_func = self._parse_urlencoded 

233 else: 

234 return stream, self.cls(), self.cls() 

235 

236 if options is None: 

237 options = {} 

238 

239 try: 

240 return parse_func(stream, mimetype, content_length, options) 

241 except ValueError: 

242 if not self.silent: 

243 raise 

244 

245 return stream, self.cls(), self.cls() 

246 

247 def _parse_multipart( 

248 self, 

249 stream: t.IO[bytes], 

250 mimetype: str, 

251 content_length: int | None, 

252 options: dict[str, str], 

253 ) -> t_parse_result: 

254 parser = MultiPartParser( 

255 stream_factory=self.stream_factory, 

256 max_form_memory_size=self.max_form_memory_size, 

257 max_form_parts=self.max_form_parts, 

258 cls=self.cls, 

259 ) 

260 boundary = options.get("boundary", "").encode("ascii") 

261 

262 if not boundary: 

263 raise ValueError("Missing boundary") 

264 

265 form, files = parser.parse(stream, boundary, content_length) 

266 return stream, form, files 

267 

268 def _parse_urlencoded( 

269 self, 

270 stream: t.IO[bytes], 

271 mimetype: str, 

272 content_length: int | None, 

273 options: dict[str, str], 

274 ) -> t_parse_result: 

275 if ( 

276 self.max_form_memory_size is not None 

277 and content_length is not None 

278 and content_length > self.max_form_memory_size 

279 ): 

280 raise RequestEntityTooLarge() 

281 

282 try: 

283 items = parse_qsl( 

284 stream.read().decode(), 

285 keep_blank_values=True, 

286 errors="werkzeug.url_quote", 

287 ) 

288 except ValueError as e: 

289 raise RequestEntityTooLarge() from e 

290 

291 return stream, self.cls(items), self.cls() 

292 

293 

294class MultiPartParser: 

295 def __init__( 

296 self, 

297 stream_factory: TStreamFactory | None = None, 

298 max_form_memory_size: int | None = None, 

299 cls: type[MultiDict] | None = None, 

300 buffer_size: int = 64 * 1024, 

301 max_form_parts: int | None = None, 

302 ) -> None: 

303 self.max_form_memory_size = max_form_memory_size 

304 self.max_form_parts = max_form_parts 

305 

306 if stream_factory is None: 

307 stream_factory = default_stream_factory 

308 

309 self.stream_factory = stream_factory 

310 

311 if cls is None: 

312 cls = MultiDict 

313 

314 self.cls = cls 

315 self.buffer_size = buffer_size 

316 

317 def fail(self, message: str) -> te.NoReturn: 

318 raise ValueError(message) 

319 

320 def get_part_charset(self, headers: Headers) -> str: 

321 # Figure out input charset for current part 

322 content_type = headers.get("content-type") 

323 

324 if content_type: 

325 parameters = parse_options_header(content_type)[1] 

326 ct_charset = parameters.get("charset", "").lower() 

327 

328 # A safe list of encodings. Modern clients should only send ASCII or UTF-8. 

329 # This list will not be extended further. 

330 if ct_charset in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}: 

331 return ct_charset 

332 

333 return "utf-8" 

334 

335 def start_file_streaming( 

336 self, event: File, total_content_length: int | None 

337 ) -> t.IO[bytes]: 

338 content_type = event.headers.get("content-type") 

339 

340 try: 

341 content_length = _plain_int(event.headers["content-length"]) 

342 except (KeyError, ValueError): 

343 content_length = 0 

344 

345 container = self.stream_factory( 

346 total_content_length=total_content_length, 

347 filename=event.filename, 

348 content_type=content_type, 

349 content_length=content_length, 

350 ) 

351 return container 

352 

353 def parse( 

354 self, stream: t.IO[bytes], boundary: bytes, content_length: int | None 

355 ) -> tuple[MultiDict, MultiDict]: 

356 current_part: Field | File 

357 container: t.IO[bytes] | list[bytes] 

358 _write: t.Callable[[bytes], t.Any] 

359 

360 parser = MultipartDecoder( 

361 boundary, 

362 max_form_memory_size=self.max_form_memory_size, 

363 max_parts=self.max_form_parts, 

364 ) 

365 

366 fields = [] 

367 files = [] 

368 

369 for data in _chunk_iter(stream.read, self.buffer_size): 

370 parser.receive_data(data) 

371 event = parser.next_event() 

372 while not isinstance(event, (Epilogue, NeedData)): 

373 if isinstance(event, Field): 

374 current_part = event 

375 container = [] 

376 _write = container.append 

377 elif isinstance(event, File): 

378 current_part = event 

379 container = self.start_file_streaming(event, content_length) 

380 _write = container.write 

381 elif isinstance(event, Data): 

382 _write(event.data) 

383 if not event.more_data: 

384 if isinstance(current_part, Field): 

385 value = b"".join(container).decode( 

386 self.get_part_charset(current_part.headers), "replace" 

387 ) 

388 fields.append((current_part.name, value)) 

389 else: 

390 container = t.cast(t.IO[bytes], container) 

391 container.seek(0) 

392 files.append( 

393 ( 

394 current_part.name, 

395 FileStorage( 

396 container, 

397 current_part.filename, 

398 current_part.name, 

399 headers=current_part.headers, 

400 ), 

401 ) 

402 ) 

403 

404 event = parser.next_event() 

405 

406 return self.cls(fields), self.cls(files) 

407 

408 

409def _chunk_iter(read: t.Callable[[int], bytes], size: int) -> t.Iterator[bytes | None]: 

410 """Read data in chunks for multipart/form-data parsing. Stop if no data is read. 

411 Yield ``None`` at the end to signal end of parsing. 

412 """ 

413 while True: 

414 data = read(size) 

415 

416 if not data: 

417 break 

418 

419 yield data 

420 

421 yield None