Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/formparser.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

147 statements  

1from __future__ import annotations 

2 

3import typing as t 

4from io import BytesIO 

5from urllib.parse import parse_qsl 

6 

7from ._internal import _plain_int 

8from .datastructures import FileStorage 

9from .datastructures import Headers 

10from .datastructures import MultiDict 

11from .exceptions import RequestEntityTooLarge 

12from .http import parse_options_header 

13from .sansio.multipart import Data 

14from .sansio.multipart import Epilogue 

15from .sansio.multipart import Field 

16from .sansio.multipart import File 

17from .sansio.multipart import MultipartDecoder 

18from .sansio.multipart import NeedData 

19from .wsgi import get_content_length 

20from .wsgi import get_input_stream 

21 

22# there are some platforms where SpooledTemporaryFile is not available. 

23# In that case we need to provide a fallback. 

24try: 

25 from tempfile import SpooledTemporaryFile 

26except ImportError: 

27 from tempfile import TemporaryFile 

28 

29 SpooledTemporaryFile = None # type: ignore 

30 

31if t.TYPE_CHECKING: 

32 import typing as te 

33 

34 from _typeshed.wsgi import WSGIEnvironment 

35 

36 t_parse_result = t.Tuple[ 

37 t.IO[bytes], MultiDict[str, str], MultiDict[str, FileStorage] 

38 ] 

39 

40 class TStreamFactory(te.Protocol): 

41 def __call__( 

42 self, 

43 total_content_length: int | None, 

44 content_type: str | None, 

45 filename: str | None, 

46 content_length: int | None = None, 

47 ) -> t.IO[bytes]: ... 

48 

49 

50F = t.TypeVar("F", bound=t.Callable[..., t.Any]) 

51 

52 

53def default_stream_factory( 

54 total_content_length: int | None, 

55 content_type: str | None, 

56 filename: str | None, 

57 content_length: int | None = None, 

58) -> t.IO[bytes]: 

59 max_size = 1024 * 500 

60 

61 if SpooledTemporaryFile is not None: 

62 return t.cast(t.IO[bytes], SpooledTemporaryFile(max_size=max_size, mode="rb+")) 

63 elif total_content_length is None or total_content_length > max_size: 

64 return t.cast(t.IO[bytes], TemporaryFile("rb+")) 

65 

66 return BytesIO() 

67 

68 

69def parse_form_data( 

70 environ: WSGIEnvironment, 

71 stream_factory: TStreamFactory | None = None, 

72 max_form_memory_size: int | None = None, 

73 max_content_length: int | None = None, 

74 cls: type[MultiDict[str, t.Any]] | None = None, 

75 silent: bool = True, 

76 *, 

77 max_form_parts: int | None = None, 

78) -> t_parse_result: 

79 """Parse the form data in the environ and return it as tuple in the form 

80 ``(stream, form, files)``. You should only call this method if the 

81 transport method is `POST`, `PUT`, or `PATCH`. 

82 

83 If the mimetype of the data transmitted is `multipart/form-data` the 

84 files multidict will be filled with `FileStorage` objects. If the 

85 mimetype is unknown the input stream is wrapped and returned as first 

86 argument, else the stream is empty. 

87 

88 This is a shortcut for the common usage of :class:`FormDataParser`. 

89 

90 :param environ: the WSGI environment to be used for parsing. 

91 :param stream_factory: An optional callable that returns a new read and 

92 writeable file descriptor. This callable works 

93 the same as :meth:`Response._get_file_stream`. 

94 :param max_form_memory_size: the maximum number of bytes to be accepted for 

95 in-memory stored form data. If the data 

96 exceeds the value specified an 

97 :exc:`~exceptions.RequestEntityTooLarge` 

98 exception is raised. 

99 :param max_content_length: If this is provided and the transmitted data 

100 is longer than this value an 

101 :exc:`~exceptions.RequestEntityTooLarge` 

102 exception is raised. 

103 :param cls: an optional dict class to use. If this is not specified 

104 or `None` the default :class:`MultiDict` is used. 

105 :param silent: If set to False parsing errors will not be caught. 

106 :param max_form_parts: The maximum number of multipart parts to be parsed. If this 

107 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised. 

108 :return: A tuple in the form ``(stream, form, files)``. 

109 

110 .. versionchanged:: 3.0 

111 The ``charset`` and ``errors`` parameters were removed. 

112 

113 .. versionchanged:: 2.3 

114 Added the ``max_form_parts`` parameter. 

115 

116 .. versionadded:: 0.5.1 

117 Added the ``silent`` parameter. 

118 

119 .. versionadded:: 0.5 

120 Added the ``max_form_memory_size``, ``max_content_length``, and ``cls`` 

121 parameters. 

122 """ 

123 return FormDataParser( 

124 stream_factory=stream_factory, 

125 max_form_memory_size=max_form_memory_size, 

126 max_content_length=max_content_length, 

127 max_form_parts=max_form_parts, 

128 silent=silent, 

129 cls=cls, 

130 ).parse_from_environ(environ) 

131 

132 

133class FormDataParser: 

134 """This class implements parsing of form data for Werkzeug. By itself 

135 it can parse multipart and url encoded form data. It can be subclassed 

136 and extended but for most mimetypes it is a better idea to use the 

137 untouched stream and expose it as separate attributes on a request 

138 object. 

139 

140 :param stream_factory: An optional callable that returns a new read and 

141 writeable file descriptor. This callable works 

142 the same as :meth:`Response._get_file_stream`. 

143 :param max_form_memory_size: the maximum number of bytes to be accepted for 

144 in-memory stored form data. If the data 

145 exceeds the value specified an 

146 :exc:`~exceptions.RequestEntityTooLarge` 

147 exception is raised. 

148 :param max_content_length: If this is provided and the transmitted data 

149 is longer than this value an 

150 :exc:`~exceptions.RequestEntityTooLarge` 

151 exception is raised. 

152 :param cls: an optional dict class to use. If this is not specified 

153 or `None` the default :class:`MultiDict` is used. 

154 :param silent: If set to False parsing errors will not be caught. 

155 :param max_form_parts: The maximum number of multipart parts to be parsed. If this 

156 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised. 

157 

158 .. versionchanged:: 3.0 

159 The ``charset`` and ``errors`` parameters were removed. 

160 

161 .. versionchanged:: 3.0 

162 The ``parse_functions`` attribute and ``get_parse_func`` methods were removed. 

163 

164 .. versionchanged:: 2.2.3 

165 Added the ``max_form_parts`` parameter. 

166 

167 .. versionadded:: 0.8 

168 """ 

169 

170 def __init__( 

171 self, 

172 stream_factory: TStreamFactory | None = None, 

173 max_form_memory_size: int | None = None, 

174 max_content_length: int | None = None, 

175 cls: type[MultiDict[str, t.Any]] | None = None, 

176 silent: bool = True, 

177 *, 

178 max_form_parts: int | None = None, 

179 ) -> None: 

180 if stream_factory is None: 

181 stream_factory = default_stream_factory 

182 

183 self.stream_factory = stream_factory 

184 self.max_form_memory_size = max_form_memory_size 

185 self.max_content_length = max_content_length 

186 self.max_form_parts = max_form_parts 

187 

188 if cls is None: 

189 cls = t.cast("type[MultiDict[str, t.Any]]", MultiDict) 

190 

191 self.cls = cls 

192 self.silent = silent 

193 

194 def parse_from_environ(self, environ: WSGIEnvironment) -> t_parse_result: 

195 """Parses the information from the environment as form data. 

196 

197 :param environ: the WSGI environment to be used for parsing. 

198 :return: A tuple in the form ``(stream, form, files)``. 

199 """ 

200 stream = get_input_stream(environ, max_content_length=self.max_content_length) 

201 content_length = get_content_length(environ) 

202 mimetype, options = parse_options_header(environ.get("CONTENT_TYPE")) 

203 return self.parse( 

204 stream, 

205 content_length=content_length, 

206 mimetype=mimetype, 

207 options=options, 

208 ) 

209 

210 def parse( 

211 self, 

212 stream: t.IO[bytes], 

213 mimetype: str, 

214 content_length: int | None, 

215 options: dict[str, str] | None = None, 

216 ) -> t_parse_result: 

217 """Parses the information from the given stream, mimetype, 

218 content length and mimetype parameters. 

219 

220 :param stream: an input stream 

221 :param mimetype: the mimetype of the data 

222 :param content_length: the content length of the incoming data 

223 :param options: optional mimetype parameters (used for 

224 the multipart boundary for instance) 

225 :return: A tuple in the form ``(stream, form, files)``. 

226 

227 .. versionchanged:: 3.0 

228 The invalid ``application/x-url-encoded`` content type is not 

229 treated as ``application/x-www-form-urlencoded``. 

230 """ 

231 if mimetype == "multipart/form-data": 

232 parse_func = self._parse_multipart 

233 elif mimetype == "application/x-www-form-urlencoded": 

234 parse_func = self._parse_urlencoded 

235 else: 

236 return stream, self.cls(), self.cls() 

237 

238 if options is None: 

239 options = {} 

240 

241 try: 

242 return parse_func(stream, mimetype, content_length, options) 

243 except ValueError: 

244 if not self.silent: 

245 raise 

246 

247 return stream, self.cls(), self.cls() 

248 

249 def _parse_multipart( 

250 self, 

251 stream: t.IO[bytes], 

252 mimetype: str, 

253 content_length: int | None, 

254 options: dict[str, str], 

255 ) -> t_parse_result: 

256 parser = MultiPartParser( 

257 stream_factory=self.stream_factory, 

258 max_form_memory_size=self.max_form_memory_size, 

259 max_form_parts=self.max_form_parts, 

260 cls=self.cls, 

261 ) 

262 boundary = options.get("boundary", "").encode("ascii") 

263 

264 if not boundary: 

265 raise ValueError("Missing boundary") 

266 

267 form, files = parser.parse(stream, boundary, content_length) 

268 return stream, form, files 

269 

270 def _parse_urlencoded( 

271 self, 

272 stream: t.IO[bytes], 

273 mimetype: str, 

274 content_length: int | None, 

275 options: dict[str, str], 

276 ) -> t_parse_result: 

277 if ( 

278 self.max_form_memory_size is not None 

279 and content_length is not None 

280 and content_length > self.max_form_memory_size 

281 ): 

282 raise RequestEntityTooLarge() 

283 

284 try: 

285 items = parse_qsl( 

286 stream.read().decode(), 

287 keep_blank_values=True, 

288 errors="werkzeug.url_quote", 

289 ) 

290 except ValueError as e: 

291 raise RequestEntityTooLarge() from e 

292 

293 return stream, self.cls(items), self.cls() 

294 

295 

296class MultiPartParser: 

297 def __init__( 

298 self, 

299 stream_factory: TStreamFactory | None = None, 

300 max_form_memory_size: int | None = None, 

301 cls: type[MultiDict[str, t.Any]] | None = None, 

302 buffer_size: int = 64 * 1024, 

303 max_form_parts: int | None = None, 

304 ) -> None: 

305 self.max_form_memory_size = max_form_memory_size 

306 self.max_form_parts = max_form_parts 

307 

308 if stream_factory is None: 

309 stream_factory = default_stream_factory 

310 

311 self.stream_factory = stream_factory 

312 

313 if cls is None: 

314 cls = t.cast("type[MultiDict[str, t.Any]]", MultiDict) 

315 

316 self.cls = cls 

317 self.buffer_size = buffer_size 

318 

319 def fail(self, message: str) -> te.NoReturn: 

320 raise ValueError(message) 

321 

322 def get_part_charset(self, headers: Headers) -> str: 

323 # Figure out input charset for current part 

324 content_type = headers.get("content-type") 

325 

326 if content_type: 

327 parameters = parse_options_header(content_type)[1] 

328 ct_charset = parameters.get("charset", "").lower() 

329 

330 # A safe list of encodings. Modern clients should only send ASCII or UTF-8. 

331 # This list will not be extended further. 

332 if ct_charset in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}: 

333 return ct_charset 

334 

335 return "utf-8" 

336 

337 def start_file_streaming( 

338 self, event: File, total_content_length: int | None 

339 ) -> t.IO[bytes]: 

340 content_type = event.headers.get("content-type") 

341 

342 try: 

343 content_length = _plain_int(event.headers["content-length"]) 

344 except (KeyError, ValueError): 

345 content_length = 0 

346 

347 container = self.stream_factory( 

348 total_content_length=total_content_length, 

349 filename=event.filename, 

350 content_type=content_type, 

351 content_length=content_length, 

352 ) 

353 return container 

354 

355 def parse( 

356 self, stream: t.IO[bytes], boundary: bytes, content_length: int | None 

357 ) -> tuple[MultiDict[str, str], MultiDict[str, FileStorage]]: 

358 current_part: Field | File 

359 container: t.IO[bytes] | list[bytes] 

360 _write: t.Callable[[bytes], t.Any] 

361 

362 parser = MultipartDecoder( 

363 boundary, 

364 max_form_memory_size=self.max_form_memory_size, 

365 max_parts=self.max_form_parts, 

366 ) 

367 

368 fields = [] 

369 files = [] 

370 

371 for data in _chunk_iter(stream.read, self.buffer_size): 

372 parser.receive_data(data) 

373 event = parser.next_event() 

374 while not isinstance(event, (Epilogue, NeedData)): 

375 if isinstance(event, Field): 

376 current_part = event 

377 container = [] 

378 _write = container.append 

379 elif isinstance(event, File): 

380 current_part = event 

381 container = self.start_file_streaming(event, content_length) 

382 _write = container.write 

383 elif isinstance(event, Data): 

384 _write(event.data) 

385 if not event.more_data: 

386 if isinstance(current_part, Field): 

387 value = b"".join(container).decode( 

388 self.get_part_charset(current_part.headers), "replace" 

389 ) 

390 fields.append((current_part.name, value)) 

391 else: 

392 container = t.cast(t.IO[bytes], container) 

393 container.seek(0) 

394 files.append( 

395 ( 

396 current_part.name, 

397 FileStorage( 

398 container, 

399 current_part.filename, 

400 current_part.name, 

401 headers=current_part.headers, 

402 ), 

403 ) 

404 ) 

405 

406 event = parser.next_event() 

407 

408 return self.cls(fields), self.cls(files) 

409 

410 

411def _chunk_iter(read: t.Callable[[int], bytes], size: int) -> t.Iterator[bytes | None]: 

412 """Read data in chunks for multipart/form-data parsing. Stop if no data is read. 

413 Yield ``None`` at the end to signal end of parsing. 

414 """ 

415 while True: 

416 data = read(size) 

417 

418 if not data: 

419 break 

420 

421 yield data 

422 

423 yield None