Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/wsgi.py: 18%

398 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:03 +0000

1import io 

2import re 

3import typing as t 

4from functools import partial 

5from functools import update_wrapper 

6from itertools import chain 

7 

8from ._internal import _make_encode_wrapper 

9from ._internal import _to_bytes 

10from ._internal import _to_str 

11from .sansio import utils as _sansio_utils 

12from .sansio.utils import host_is_trusted # noqa: F401 # Imported as part of API 

13from .urls import _URLTuple 

14from .urls import uri_to_iri 

15from .urls import url_join 

16from .urls import url_parse 

17from .urls import url_quote 

18 

19if t.TYPE_CHECKING: 

20 from _typeshed.wsgi import WSGIApplication 

21 from _typeshed.wsgi import WSGIEnvironment 

22 

23 

24def responder(f: t.Callable[..., "WSGIApplication"]) -> "WSGIApplication": 

25 """Marks a function as responder. Decorate a function with it and it 

26 will automatically call the return value as WSGI application. 

27 

28 Example:: 

29 

30 @responder 

31 def application(environ, start_response): 

32 return Response('Hello World!') 

33 """ 

34 return update_wrapper(lambda *a: f(*a)(*a[-2:]), f) 

35 

36 

37def get_current_url( 

38 environ: "WSGIEnvironment", 

39 root_only: bool = False, 

40 strip_querystring: bool = False, 

41 host_only: bool = False, 

42 trusted_hosts: t.Optional[t.Iterable[str]] = None, 

43) -> str: 

44 """Recreate the URL for a request from the parts in a WSGI 

45 environment. 

46 

47 The URL is an IRI, not a URI, so it may contain Unicode characters. 

48 Use :func:`~werkzeug.urls.iri_to_uri` to convert it to ASCII. 

49 

50 :param environ: The WSGI environment to get the URL parts from. 

51 :param root_only: Only build the root path, don't include the 

52 remaining path or query string. 

53 :param strip_querystring: Don't include the query string. 

54 :param host_only: Only build the scheme and host. 

55 :param trusted_hosts: A list of trusted host names to validate the 

56 host against. 

57 """ 

58 parts = { 

59 "scheme": environ["wsgi.url_scheme"], 

60 "host": get_host(environ, trusted_hosts), 

61 } 

62 

63 if not host_only: 

64 parts["root_path"] = environ.get("SCRIPT_NAME", "") 

65 

66 if not root_only: 

67 parts["path"] = environ.get("PATH_INFO", "") 

68 

69 if not strip_querystring: 

70 parts["query_string"] = environ.get("QUERY_STRING", "").encode("latin1") 

71 

72 return _sansio_utils.get_current_url(**parts) 

73 

74 

75def _get_server( 

76 environ: "WSGIEnvironment", 

77) -> t.Optional[t.Tuple[str, t.Optional[int]]]: 

78 name = environ.get("SERVER_NAME") 

79 

80 if name is None: 

81 return None 

82 

83 try: 

84 port: t.Optional[int] = int(environ.get("SERVER_PORT", None)) 

85 except (TypeError, ValueError): 

86 # unix socket 

87 port = None 

88 

89 return name, port 

90 

91 

92def get_host( 

93 environ: "WSGIEnvironment", trusted_hosts: t.Optional[t.Iterable[str]] = None 

94) -> str: 

95 """Return the host for the given WSGI environment. 

96 

97 The ``Host`` header is preferred, then ``SERVER_NAME`` if it's not 

98 set. The returned host will only contain the port if it is different 

99 than the standard port for the protocol. 

100 

101 Optionally, verify that the host is trusted using 

102 :func:`host_is_trusted` and raise a 

103 :exc:`~werkzeug.exceptions.SecurityError` if it is not. 

104 

105 :param environ: A WSGI environment dict. 

106 :param trusted_hosts: A list of trusted host names. 

107 

108 :return: Host, with port if necessary. 

109 :raise ~werkzeug.exceptions.SecurityError: If the host is not 

110 trusted. 

111 """ 

112 return _sansio_utils.get_host( 

113 environ["wsgi.url_scheme"], 

114 environ.get("HTTP_HOST"), 

115 _get_server(environ), 

116 trusted_hosts, 

117 ) 

118 

119 

120def get_content_length(environ: "WSGIEnvironment") -> t.Optional[int]: 

121 """Returns the content length from the WSGI environment as 

122 integer. If it's not available or chunked transfer encoding is used, 

123 ``None`` is returned. 

124 

125 .. versionadded:: 0.9 

126 

127 :param environ: the WSGI environ to fetch the content length from. 

128 """ 

129 if environ.get("HTTP_TRANSFER_ENCODING", "") == "chunked": 

130 return None 

131 

132 content_length = environ.get("CONTENT_LENGTH") 

133 if content_length is not None: 

134 try: 

135 return max(0, int(content_length)) 

136 except (ValueError, TypeError): 

137 pass 

138 return None 

139 

140 

141def get_input_stream( 

142 environ: "WSGIEnvironment", safe_fallback: bool = True 

143) -> t.IO[bytes]: 

144 """Returns the input stream from the WSGI environment and wraps it 

145 in the most sensible way possible. The stream returned is not the 

146 raw WSGI stream in most cases but one that is safe to read from 

147 without taking into account the content length. 

148 

149 If content length is not set, the stream will be empty for safety reasons. 

150 If the WSGI server supports chunked or infinite streams, it should set 

151 the ``wsgi.input_terminated`` value in the WSGI environ to indicate that. 

152 

153 .. versionadded:: 0.9 

154 

155 :param environ: the WSGI environ to fetch the stream from. 

156 :param safe_fallback: use an empty stream as a safe fallback when the 

157 content length is not set. Disabling this allows infinite streams, 

158 which can be a denial-of-service risk. 

159 """ 

160 stream = t.cast(t.IO[bytes], environ["wsgi.input"]) 

161 content_length = get_content_length(environ) 

162 

163 # A wsgi extension that tells us if the input is terminated. In 

164 # that case we return the stream unchanged as we know we can safely 

165 # read it until the end. 

166 if environ.get("wsgi.input_terminated"): 

167 return stream 

168 

169 # If the request doesn't specify a content length, returning the stream is 

170 # potentially dangerous because it could be infinite, malicious or not. If 

171 # safe_fallback is true, return an empty stream instead for safety. 

172 if content_length is None: 

173 return io.BytesIO() if safe_fallback else stream 

174 

175 # Otherwise limit the stream to the content length 

176 return t.cast(t.IO[bytes], LimitedStream(stream, content_length)) 

177 

178 

179def get_query_string(environ: "WSGIEnvironment") -> str: 

180 """Returns the ``QUERY_STRING`` from the WSGI environment. This also 

181 takes care of the WSGI decoding dance. The string returned will be 

182 restricted to ASCII characters. 

183 

184 :param environ: WSGI environment to get the query string from. 

185 

186 .. versionadded:: 0.9 

187 """ 

188 qs = environ.get("QUERY_STRING", "").encode("latin1") 

189 # QUERY_STRING really should be ascii safe but some browsers 

190 # will send us some unicode stuff (I am looking at you IE). 

191 # In that case we want to urllib quote it badly. 

192 return url_quote(qs, safe=":&%=+$!*'(),") 

193 

194 

195def get_path_info( 

196 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace" 

197) -> str: 

198 """Return the ``PATH_INFO`` from the WSGI environment and decode it 

199 unless ``charset`` is ``None``. 

200 

201 :param environ: WSGI environment to get the path from. 

202 :param charset: The charset for the path info, or ``None`` if no 

203 decoding should be performed. 

204 :param errors: The decoding error handling. 

205 

206 .. versionadded:: 0.9 

207 """ 

208 path = environ.get("PATH_INFO", "").encode("latin1") 

209 return _to_str(path, charset, errors, allow_none_charset=True) # type: ignore 

210 

211 

212def get_script_name( 

213 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace" 

214) -> str: 

215 """Return the ``SCRIPT_NAME`` from the WSGI environment and decode 

216 it unless `charset` is set to ``None``. 

217 

218 :param environ: WSGI environment to get the path from. 

219 :param charset: The charset for the path, or ``None`` if no decoding 

220 should be performed. 

221 :param errors: The decoding error handling. 

222 

223 .. versionadded:: 0.9 

224 """ 

225 path = environ.get("SCRIPT_NAME", "").encode("latin1") 

226 return _to_str(path, charset, errors, allow_none_charset=True) # type: ignore 

227 

228 

229def pop_path_info( 

230 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace" 

231) -> t.Optional[str]: 

232 """Removes and returns the next segment of `PATH_INFO`, pushing it onto 

233 `SCRIPT_NAME`. Returns `None` if there is nothing left on `PATH_INFO`. 

234 

235 If the `charset` is set to `None` bytes are returned. 

236 

237 If there are empty segments (``'/foo//bar``) these are ignored but 

238 properly pushed to the `SCRIPT_NAME`: 

239 

240 >>> env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b'} 

241 >>> pop_path_info(env) 

242 'a' 

243 >>> env['SCRIPT_NAME'] 

244 '/foo/a' 

245 >>> pop_path_info(env) 

246 'b' 

247 >>> env['SCRIPT_NAME'] 

248 '/foo/a/b' 

249 

250 .. versionadded:: 0.5 

251 

252 .. versionchanged:: 0.9 

253 The path is now decoded and a charset and encoding 

254 parameter can be provided. 

255 

256 :param environ: the WSGI environment that is modified. 

257 :param charset: The ``encoding`` parameter passed to 

258 :func:`bytes.decode`. 

259 :param errors: The ``errors`` paramater passed to 

260 :func:`bytes.decode`. 

261 """ 

262 path = environ.get("PATH_INFO") 

263 if not path: 

264 return None 

265 

266 script_name = environ.get("SCRIPT_NAME", "") 

267 

268 # shift multiple leading slashes over 

269 old_path = path 

270 path = path.lstrip("/") 

271 if path != old_path: 

272 script_name += "/" * (len(old_path) - len(path)) 

273 

274 if "/" not in path: 

275 environ["PATH_INFO"] = "" 

276 environ["SCRIPT_NAME"] = script_name + path 

277 rv = path.encode("latin1") 

278 else: 

279 segment, path = path.split("/", 1) 

280 environ["PATH_INFO"] = f"/{path}" 

281 environ["SCRIPT_NAME"] = script_name + segment 

282 rv = segment.encode("latin1") 

283 

284 return _to_str(rv, charset, errors, allow_none_charset=True) # type: ignore 

285 

286 

287def peek_path_info( 

288 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace" 

289) -> t.Optional[str]: 

290 """Returns the next segment on the `PATH_INFO` or `None` if there 

291 is none. Works like :func:`pop_path_info` without modifying the 

292 environment: 

293 

294 >>> env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b'} 

295 >>> peek_path_info(env) 

296 'a' 

297 >>> peek_path_info(env) 

298 'a' 

299 

300 If the `charset` is set to `None` bytes are returned. 

301 

302 .. versionadded:: 0.5 

303 

304 .. versionchanged:: 0.9 

305 The path is now decoded and a charset and encoding 

306 parameter can be provided. 

307 

308 :param environ: the WSGI environment that is checked. 

309 """ 

310 segments = environ.get("PATH_INFO", "").lstrip("/").split("/", 1) 

311 if segments: 

312 return _to_str( # type: ignore 

313 segments[0].encode("latin1"), charset, errors, allow_none_charset=True 

314 ) 

315 return None 

316 

317 

318def extract_path_info( 

319 environ_or_baseurl: t.Union[str, "WSGIEnvironment"], 

320 path_or_url: t.Union[str, _URLTuple], 

321 charset: str = "utf-8", 

322 errors: str = "werkzeug.url_quote", 

323 collapse_http_schemes: bool = True, 

324) -> t.Optional[str]: 

325 """Extracts the path info from the given URL (or WSGI environment) and 

326 path. The path info returned is a string. The URLs might also be IRIs. 

327 

328 If the path info could not be determined, `None` is returned. 

329 

330 Some examples: 

331 

332 >>> extract_path_info('http://example.com/app', '/app/hello') 

333 '/hello' 

334 >>> extract_path_info('http://example.com/app', 

335 ... 'https://example.com/app/hello') 

336 '/hello' 

337 >>> extract_path_info('http://example.com/app', 

338 ... 'https://example.com/app/hello', 

339 ... collapse_http_schemes=False) is None 

340 True 

341 

342 Instead of providing a base URL you can also pass a WSGI environment. 

343 

344 :param environ_or_baseurl: a WSGI environment dict, a base URL or 

345 base IRI. This is the root of the 

346 application. 

347 :param path_or_url: an absolute path from the server root, a 

348 relative path (in which case it's the path info) 

349 or a full URL. 

350 :param charset: the charset for byte data in URLs 

351 :param errors: the error handling on decode 

352 :param collapse_http_schemes: if set to `False` the algorithm does 

353 not assume that http and https on the 

354 same server point to the same 

355 resource. 

356 

357 .. versionchanged:: 0.15 

358 The ``errors`` parameter defaults to leaving invalid bytes 

359 quoted instead of replacing them. 

360 

361 .. versionadded:: 0.6 

362 """ 

363 

364 def _normalize_netloc(scheme: str, netloc: str) -> str: 

365 parts = netloc.split("@", 1)[-1].split(":", 1) 

366 port: t.Optional[str] 

367 

368 if len(parts) == 2: 

369 netloc, port = parts 

370 if (scheme == "http" and port == "80") or ( 

371 scheme == "https" and port == "443" 

372 ): 

373 port = None 

374 else: 

375 netloc = parts[0] 

376 port = None 

377 

378 if port is not None: 

379 netloc += f":{port}" 

380 

381 return netloc 

382 

383 # make sure whatever we are working on is a IRI and parse it 

384 path = uri_to_iri(path_or_url, charset, errors) 

385 if isinstance(environ_or_baseurl, dict): 

386 environ_or_baseurl = get_current_url(environ_or_baseurl, root_only=True) 

387 base_iri = uri_to_iri(environ_or_baseurl, charset, errors) 

388 base_scheme, base_netloc, base_path = url_parse(base_iri)[:3] 

389 cur_scheme, cur_netloc, cur_path = url_parse(url_join(base_iri, path))[:3] 

390 

391 # normalize the network location 

392 base_netloc = _normalize_netloc(base_scheme, base_netloc) 

393 cur_netloc = _normalize_netloc(cur_scheme, cur_netloc) 

394 

395 # is that IRI even on a known HTTP scheme? 

396 if collapse_http_schemes: 

397 for scheme in base_scheme, cur_scheme: 

398 if scheme not in ("http", "https"): 

399 return None 

400 else: 

401 if not (base_scheme in ("http", "https") and base_scheme == cur_scheme): 

402 return None 

403 

404 # are the netlocs compatible? 

405 if base_netloc != cur_netloc: 

406 return None 

407 

408 # are we below the application path? 

409 base_path = base_path.rstrip("/") 

410 if not cur_path.startswith(base_path): 

411 return None 

412 

413 return f"/{cur_path[len(base_path) :].lstrip('/')}" 

414 

415 

416class ClosingIterator: 

417 """The WSGI specification requires that all middlewares and gateways 

418 respect the `close` callback of the iterable returned by the application. 

419 Because it is useful to add another close action to a returned iterable 

420 and adding a custom iterable is a boring task this class can be used for 

421 that:: 

422 

423 return ClosingIterator(app(environ, start_response), [cleanup_session, 

424 cleanup_locals]) 

425 

426 If there is just one close function it can be passed instead of the list. 

427 

428 A closing iterator is not needed if the application uses response objects 

429 and finishes the processing if the response is started:: 

430 

431 try: 

432 return response(environ, start_response) 

433 finally: 

434 cleanup_session() 

435 cleanup_locals() 

436 """ 

437 

438 def __init__( 

439 self, 

440 iterable: t.Iterable[bytes], 

441 callbacks: t.Optional[ 

442 t.Union[t.Callable[[], None], t.Iterable[t.Callable[[], None]]] 

443 ] = None, 

444 ) -> None: 

445 iterator = iter(iterable) 

446 self._next = t.cast(t.Callable[[], bytes], partial(next, iterator)) 

447 if callbacks is None: 

448 callbacks = [] 

449 elif callable(callbacks): 

450 callbacks = [callbacks] 

451 else: 

452 callbacks = list(callbacks) 

453 iterable_close = getattr(iterable, "close", None) 

454 if iterable_close: 

455 callbacks.insert(0, iterable_close) 

456 self._callbacks = callbacks 

457 

458 def __iter__(self) -> "ClosingIterator": 

459 return self 

460 

461 def __next__(self) -> bytes: 

462 return self._next() 

463 

464 def close(self) -> None: 

465 for callback in self._callbacks: 

466 callback() 

467 

468 

469def wrap_file( 

470 environ: "WSGIEnvironment", file: t.IO[bytes], buffer_size: int = 8192 

471) -> t.Iterable[bytes]: 

472 """Wraps a file. This uses the WSGI server's file wrapper if available 

473 or otherwise the generic :class:`FileWrapper`. 

474 

475 .. versionadded:: 0.5 

476 

477 If the file wrapper from the WSGI server is used it's important to not 

478 iterate over it from inside the application but to pass it through 

479 unchanged. If you want to pass out a file wrapper inside a response 

480 object you have to set :attr:`Response.direct_passthrough` to `True`. 

481 

482 More information about file wrappers are available in :pep:`333`. 

483 

484 :param file: a :class:`file`-like object with a :meth:`~file.read` method. 

485 :param buffer_size: number of bytes for one iteration. 

486 """ 

487 return environ.get("wsgi.file_wrapper", FileWrapper)( # type: ignore 

488 file, buffer_size 

489 ) 

490 

491 

492class FileWrapper: 

493 """This class can be used to convert a :class:`file`-like object into 

494 an iterable. It yields `buffer_size` blocks until the file is fully 

495 read. 

496 

497 You should not use this class directly but rather use the 

498 :func:`wrap_file` function that uses the WSGI server's file wrapper 

499 support if it's available. 

500 

501 .. versionadded:: 0.5 

502 

503 If you're using this object together with a :class:`Response` you have 

504 to use the `direct_passthrough` mode. 

505 

506 :param file: a :class:`file`-like object with a :meth:`~file.read` method. 

507 :param buffer_size: number of bytes for one iteration. 

508 """ 

509 

510 def __init__(self, file: t.IO[bytes], buffer_size: int = 8192) -> None: 

511 self.file = file 

512 self.buffer_size = buffer_size 

513 

514 def close(self) -> None: 

515 if hasattr(self.file, "close"): 

516 self.file.close() 

517 

518 def seekable(self) -> bool: 

519 if hasattr(self.file, "seekable"): 

520 return self.file.seekable() 

521 if hasattr(self.file, "seek"): 

522 return True 

523 return False 

524 

525 def seek(self, *args: t.Any) -> None: 

526 if hasattr(self.file, "seek"): 

527 self.file.seek(*args) 

528 

529 def tell(self) -> t.Optional[int]: 

530 if hasattr(self.file, "tell"): 

531 return self.file.tell() 

532 return None 

533 

534 def __iter__(self) -> "FileWrapper": 

535 return self 

536 

537 def __next__(self) -> bytes: 

538 data = self.file.read(self.buffer_size) 

539 if data: 

540 return data 

541 raise StopIteration() 

542 

543 

544class _RangeWrapper: 

545 # private for now, but should we make it public in the future ? 

546 

547 """This class can be used to convert an iterable object into 

548 an iterable that will only yield a piece of the underlying content. 

549 It yields blocks until the underlying stream range is fully read. 

550 The yielded blocks will have a size that can't exceed the original 

551 iterator defined block size, but that can be smaller. 

552 

553 If you're using this object together with a :class:`Response` you have 

554 to use the `direct_passthrough` mode. 

555 

556 :param iterable: an iterable object with a :meth:`__next__` method. 

557 :param start_byte: byte from which read will start. 

558 :param byte_range: how many bytes to read. 

559 """ 

560 

561 def __init__( 

562 self, 

563 iterable: t.Union[t.Iterable[bytes], t.IO[bytes]], 

564 start_byte: int = 0, 

565 byte_range: t.Optional[int] = None, 

566 ): 

567 self.iterable = iter(iterable) 

568 self.byte_range = byte_range 

569 self.start_byte = start_byte 

570 self.end_byte = None 

571 

572 if byte_range is not None: 

573 self.end_byte = start_byte + byte_range 

574 

575 self.read_length = 0 

576 self.seekable = ( 

577 hasattr(iterable, "seekable") and iterable.seekable() # type: ignore 

578 ) 

579 self.end_reached = False 

580 

581 def __iter__(self) -> "_RangeWrapper": 

582 return self 

583 

584 def _next_chunk(self) -> bytes: 

585 try: 

586 chunk = next(self.iterable) 

587 self.read_length += len(chunk) 

588 return chunk 

589 except StopIteration: 

590 self.end_reached = True 

591 raise 

592 

593 def _first_iteration(self) -> t.Tuple[t.Optional[bytes], int]: 

594 chunk = None 

595 if self.seekable: 

596 self.iterable.seek(self.start_byte) # type: ignore 

597 self.read_length = self.iterable.tell() # type: ignore 

598 contextual_read_length = self.read_length 

599 else: 

600 while self.read_length <= self.start_byte: 

601 chunk = self._next_chunk() 

602 if chunk is not None: 

603 chunk = chunk[self.start_byte - self.read_length :] 

604 contextual_read_length = self.start_byte 

605 return chunk, contextual_read_length 

606 

607 def _next(self) -> bytes: 

608 if self.end_reached: 

609 raise StopIteration() 

610 chunk = None 

611 contextual_read_length = self.read_length 

612 if self.read_length == 0: 

613 chunk, contextual_read_length = self._first_iteration() 

614 if chunk is None: 

615 chunk = self._next_chunk() 

616 if self.end_byte is not None and self.read_length >= self.end_byte: 

617 self.end_reached = True 

618 return chunk[: self.end_byte - contextual_read_length] 

619 return chunk 

620 

621 def __next__(self) -> bytes: 

622 chunk = self._next() 

623 if chunk: 

624 return chunk 

625 self.end_reached = True 

626 raise StopIteration() 

627 

628 def close(self) -> None: 

629 if hasattr(self.iterable, "close"): 

630 self.iterable.close() # type: ignore 

631 

632 

633def _make_chunk_iter( 

634 stream: t.Union[t.Iterable[bytes], t.IO[bytes]], 

635 limit: t.Optional[int], 

636 buffer_size: int, 

637) -> t.Iterator[bytes]: 

638 """Helper for the line and chunk iter functions.""" 

639 if isinstance(stream, (bytes, bytearray, str)): 

640 raise TypeError( 

641 "Passed a string or byte object instead of true iterator or stream." 

642 ) 

643 if not hasattr(stream, "read"): 

644 for item in stream: 

645 if item: 

646 yield item 

647 return 

648 stream = t.cast(t.IO[bytes], stream) 

649 if not isinstance(stream, LimitedStream) and limit is not None: 

650 stream = t.cast(t.IO[bytes], LimitedStream(stream, limit)) 

651 _read = stream.read 

652 while True: 

653 item = _read(buffer_size) 

654 if not item: 

655 break 

656 yield item 

657 

658 

659def make_line_iter( 

660 stream: t.Union[t.Iterable[bytes], t.IO[bytes]], 

661 limit: t.Optional[int] = None, 

662 buffer_size: int = 10 * 1024, 

663 cap_at_buffer: bool = False, 

664) -> t.Iterator[bytes]: 

665 """Safely iterates line-based over an input stream. If the input stream 

666 is not a :class:`LimitedStream` the `limit` parameter is mandatory. 

667 

668 This uses the stream's :meth:`~file.read` method internally as opposite 

669 to the :meth:`~file.readline` method that is unsafe and can only be used 

670 in violation of the WSGI specification. The same problem applies to the 

671 `__iter__` function of the input stream which calls :meth:`~file.readline` 

672 without arguments. 

673 

674 If you need line-by-line processing it's strongly recommended to iterate 

675 over the input stream using this helper function. 

676 

677 .. versionchanged:: 0.8 

678 This function now ensures that the limit was reached. 

679 

680 .. versionadded:: 0.9 

681 added support for iterators as input stream. 

682 

683 .. versionadded:: 0.11.10 

684 added support for the `cap_at_buffer` parameter. 

685 

686 :param stream: the stream or iterate to iterate over. 

687 :param limit: the limit in bytes for the stream. (Usually 

688 content length. Not necessary if the `stream` 

689 is a :class:`LimitedStream`. 

690 :param buffer_size: The optional buffer size. 

691 :param cap_at_buffer: if this is set chunks are split if they are longer 

692 than the buffer size. Internally this is implemented 

693 that the buffer size might be exhausted by a factor 

694 of two however. 

695 """ 

696 _iter = _make_chunk_iter(stream, limit, buffer_size) 

697 

698 first_item = next(_iter, "") 

699 if not first_item: 

700 return 

701 

702 s = _make_encode_wrapper(first_item) 

703 empty = t.cast(bytes, s("")) 

704 cr = t.cast(bytes, s("\r")) 

705 lf = t.cast(bytes, s("\n")) 

706 crlf = t.cast(bytes, s("\r\n")) 

707 

708 _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter)) 

709 

710 def _iter_basic_lines() -> t.Iterator[bytes]: 

711 _join = empty.join 

712 buffer: t.List[bytes] = [] 

713 while True: 

714 new_data = next(_iter, "") 

715 if not new_data: 

716 break 

717 new_buf: t.List[bytes] = [] 

718 buf_size = 0 

719 for item in t.cast( 

720 t.Iterator[bytes], chain(buffer, new_data.splitlines(True)) 

721 ): 

722 new_buf.append(item) 

723 buf_size += len(item) 

724 if item and item[-1:] in crlf: 

725 yield _join(new_buf) 

726 new_buf = [] 

727 elif cap_at_buffer and buf_size >= buffer_size: 

728 rv = _join(new_buf) 

729 while len(rv) >= buffer_size: 

730 yield rv[:buffer_size] 

731 rv = rv[buffer_size:] 

732 new_buf = [rv] 

733 buffer = new_buf 

734 if buffer: 

735 yield _join(buffer) 

736 

737 # This hackery is necessary to merge 'foo\r' and '\n' into one item 

738 # of 'foo\r\n' if we were unlucky and we hit a chunk boundary. 

739 previous = empty 

740 for item in _iter_basic_lines(): 

741 if item == lf and previous[-1:] == cr: 

742 previous += item 

743 item = empty 

744 if previous: 

745 yield previous 

746 previous = item 

747 if previous: 

748 yield previous 

749 

750 

751def make_chunk_iter( 

752 stream: t.Union[t.Iterable[bytes], t.IO[bytes]], 

753 separator: bytes, 

754 limit: t.Optional[int] = None, 

755 buffer_size: int = 10 * 1024, 

756 cap_at_buffer: bool = False, 

757) -> t.Iterator[bytes]: 

758 """Works like :func:`make_line_iter` but accepts a separator 

759 which divides chunks. If you want newline based processing 

760 you should use :func:`make_line_iter` instead as it 

761 supports arbitrary newline markers. 

762 

763 .. versionadded:: 0.8 

764 

765 .. versionadded:: 0.9 

766 added support for iterators as input stream. 

767 

768 .. versionadded:: 0.11.10 

769 added support for the `cap_at_buffer` parameter. 

770 

771 :param stream: the stream or iterate to iterate over. 

772 :param separator: the separator that divides chunks. 

773 :param limit: the limit in bytes for the stream. (Usually 

774 content length. Not necessary if the `stream` 

775 is otherwise already limited). 

776 :param buffer_size: The optional buffer size. 

777 :param cap_at_buffer: if this is set chunks are split if they are longer 

778 than the buffer size. Internally this is implemented 

779 that the buffer size might be exhausted by a factor 

780 of two however. 

781 """ 

782 _iter = _make_chunk_iter(stream, limit, buffer_size) 

783 

784 first_item = next(_iter, b"") 

785 if not first_item: 

786 return 

787 

788 _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter)) 

789 if isinstance(first_item, str): 

790 separator = _to_str(separator) 

791 _split = re.compile(f"({re.escape(separator)})").split 

792 _join = "".join 

793 else: 

794 separator = _to_bytes(separator) 

795 _split = re.compile(b"(" + re.escape(separator) + b")").split 

796 _join = b"".join 

797 

798 buffer: t.List[bytes] = [] 

799 while True: 

800 new_data = next(_iter, b"") 

801 if not new_data: 

802 break 

803 chunks = _split(new_data) 

804 new_buf: t.List[bytes] = [] 

805 buf_size = 0 

806 for item in chain(buffer, chunks): 

807 if item == separator: 

808 yield _join(new_buf) 

809 new_buf = [] 

810 buf_size = 0 

811 else: 

812 buf_size += len(item) 

813 new_buf.append(item) 

814 

815 if cap_at_buffer and buf_size >= buffer_size: 

816 rv = _join(new_buf) 

817 while len(rv) >= buffer_size: 

818 yield rv[:buffer_size] 

819 rv = rv[buffer_size:] 

820 new_buf = [rv] 

821 buf_size = len(rv) 

822 

823 buffer = new_buf 

824 if buffer: 

825 yield _join(buffer) 

826 

827 

828class LimitedStream(io.IOBase): 

829 """Wraps a stream so that it doesn't read more than n bytes. If the 

830 stream is exhausted and the caller tries to get more bytes from it 

831 :func:`on_exhausted` is called which by default returns an empty 

832 string. The return value of that function is forwarded 

833 to the reader function. So if it returns an empty string 

834 :meth:`read` will return an empty string as well. 

835 

836 The limit however must never be higher than what the stream can 

837 output. Otherwise :meth:`readlines` will try to read past the 

838 limit. 

839 

840 .. admonition:: Note on WSGI compliance 

841 

842 calls to :meth:`readline` and :meth:`readlines` are not 

843 WSGI compliant because it passes a size argument to the 

844 readline methods. Unfortunately the WSGI PEP is not safely 

845 implementable without a size argument to :meth:`readline` 

846 because there is no EOF marker in the stream. As a result 

847 of that the use of :meth:`readline` is discouraged. 

848 

849 For the same reason iterating over the :class:`LimitedStream` 

850 is not portable. It internally calls :meth:`readline`. 

851 

852 We strongly suggest using :meth:`read` only or using the 

853 :func:`make_line_iter` which safely iterates line-based 

854 over a WSGI input stream. 

855 

856 :param stream: the stream to wrap. 

857 :param limit: the limit for the stream, must not be longer than 

858 what the string can provide if the stream does not 

859 end with `EOF` (like `wsgi.input`) 

860 """ 

861 

862 def __init__(self, stream: t.IO[bytes], limit: int) -> None: 

863 self._read = stream.read 

864 self._readline = stream.readline 

865 self._pos = 0 

866 self.limit = limit 

867 

868 def __iter__(self) -> "LimitedStream": 

869 return self 

870 

871 @property 

872 def is_exhausted(self) -> bool: 

873 """If the stream is exhausted this attribute is `True`.""" 

874 return self._pos >= self.limit 

875 

876 def on_exhausted(self) -> bytes: 

877 """This is called when the stream tries to read past the limit. 

878 The return value of this function is returned from the reading 

879 function. 

880 """ 

881 # Read null bytes from the stream so that we get the 

882 # correct end of stream marker. 

883 return self._read(0) 

884 

885 def on_disconnect(self) -> bytes: 

886 """What should happen if a disconnect is detected? The return 

887 value of this function is returned from read functions in case 

888 the client went away. By default a 

889 :exc:`~werkzeug.exceptions.ClientDisconnected` exception is raised. 

890 """ 

891 from .exceptions import ClientDisconnected 

892 

893 raise ClientDisconnected() 

894 

895 def exhaust(self, chunk_size: int = 1024 * 64) -> None: 

896 """Exhaust the stream. This consumes all the data left until the 

897 limit is reached. 

898 

899 :param chunk_size: the size for a chunk. It will read the chunk 

900 until the stream is exhausted and throw away 

901 the results. 

902 """ 

903 to_read = self.limit - self._pos 

904 chunk = chunk_size 

905 while to_read > 0: 

906 chunk = min(to_read, chunk) 

907 self.read(chunk) 

908 to_read -= chunk 

909 

910 def read(self, size: t.Optional[int] = None) -> bytes: 

911 """Read `size` bytes or if size is not provided everything is read. 

912 

913 :param size: the number of bytes read. 

914 """ 

915 if self._pos >= self.limit: 

916 return self.on_exhausted() 

917 if size is None or size == -1: # -1 is for consistence with file 

918 size = self.limit 

919 to_read = min(self.limit - self._pos, size) 

920 try: 

921 read = self._read(to_read) 

922 except (OSError, ValueError): 

923 return self.on_disconnect() 

924 if to_read and len(read) != to_read: 

925 return self.on_disconnect() 

926 self._pos += len(read) 

927 return read 

928 

929 def readline(self, size: t.Optional[int] = None) -> bytes: 

930 """Reads one line from the stream.""" 

931 if self._pos >= self.limit: 

932 return self.on_exhausted() 

933 if size is None: 

934 size = self.limit - self._pos 

935 else: 

936 size = min(size, self.limit - self._pos) 

937 try: 

938 line = self._readline(size) 

939 except (ValueError, OSError): 

940 return self.on_disconnect() 

941 if size and not line: 

942 return self.on_disconnect() 

943 self._pos += len(line) 

944 return line 

945 

946 def readlines(self, size: t.Optional[int] = None) -> t.List[bytes]: 

947 """Reads a file into a list of strings. It calls :meth:`readline` 

948 until the file is read to the end. It does support the optional 

949 `size` argument if the underlying stream supports it for 

950 `readline`. 

951 """ 

952 last_pos = self._pos 

953 result = [] 

954 if size is not None: 

955 end = min(self.limit, last_pos + size) 

956 else: 

957 end = self.limit 

958 while True: 

959 if size is not None: 

960 size -= last_pos - self._pos 

961 if self._pos >= end: 

962 break 

963 result.append(self.readline(size)) 

964 if size is not None: 

965 last_pos = self._pos 

966 return result 

967 

968 def tell(self) -> int: 

969 """Returns the position of the stream. 

970 

971 .. versionadded:: 0.9 

972 """ 

973 return self._pos 

974 

975 def __next__(self) -> bytes: 

976 line = self.readline() 

977 if not line: 

978 raise StopIteration() 

979 return line 

980 

981 def readable(self) -> bool: 

982 return True