Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/wsgi.py: 17%

396 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1import io 

2import re 

3import typing as t 

4import warnings 

5from functools import partial 

6from functools import update_wrapper 

7from itertools import chain 

8 

9from ._internal import _make_encode_wrapper 

10from ._internal import _to_bytes 

11from ._internal import _to_str 

12from .sansio import utils as _sansio_utils 

13from .sansio.utils import host_is_trusted # noqa: F401 # Imported as part of API 

14from .urls import _URLTuple 

15from .urls import uri_to_iri 

16from .urls import url_join 

17from .urls import url_parse 

18from .urls import url_quote 

19 

20if t.TYPE_CHECKING: 

21 from _typeshed.wsgi import WSGIApplication 

22 from _typeshed.wsgi import WSGIEnvironment 

23 

24 

25def responder(f: t.Callable[..., "WSGIApplication"]) -> "WSGIApplication": 

26 """Marks a function as responder. Decorate a function with it and it 

27 will automatically call the return value as WSGI application. 

28 

29 Example:: 

30 

31 @responder 

32 def application(environ, start_response): 

33 return Response('Hello World!') 

34 """ 

35 return update_wrapper(lambda *a: f(*a)(*a[-2:]), f) 

36 

37 

38def get_current_url( 

39 environ: "WSGIEnvironment", 

40 root_only: bool = False, 

41 strip_querystring: bool = False, 

42 host_only: bool = False, 

43 trusted_hosts: t.Optional[t.Iterable[str]] = None, 

44) -> str: 

45 """Recreate the URL for a request from the parts in a WSGI 

46 environment. 

47 

48 The URL is an IRI, not a URI, so it may contain Unicode characters. 

49 Use :func:`~werkzeug.urls.iri_to_uri` to convert it to ASCII. 

50 

51 :param environ: The WSGI environment to get the URL parts from. 

52 :param root_only: Only build the root path, don't include the 

53 remaining path or query string. 

54 :param strip_querystring: Don't include the query string. 

55 :param host_only: Only build the scheme and host. 

56 :param trusted_hosts: A list of trusted host names to validate the 

57 host against. 

58 """ 

59 parts = { 

60 "scheme": environ["wsgi.url_scheme"], 

61 "host": get_host(environ, trusted_hosts), 

62 } 

63 

64 if not host_only: 

65 parts["root_path"] = environ.get("SCRIPT_NAME", "") 

66 

67 if not root_only: 

68 parts["path"] = environ.get("PATH_INFO", "") 

69 

70 if not strip_querystring: 

71 parts["query_string"] = environ.get("QUERY_STRING", "").encode("latin1") 

72 

73 return _sansio_utils.get_current_url(**parts) 

74 

75 

76def _get_server( 

77 environ: "WSGIEnvironment", 

78) -> t.Optional[t.Tuple[str, t.Optional[int]]]: 

79 name = environ.get("SERVER_NAME") 

80 

81 if name is None: 

82 return None 

83 

84 try: 

85 port: t.Optional[int] = int(environ.get("SERVER_PORT", None)) 

86 except (TypeError, ValueError): 

87 # unix socket 

88 port = None 

89 

90 return name, port 

91 

92 

93def get_host( 

94 environ: "WSGIEnvironment", trusted_hosts: t.Optional[t.Iterable[str]] = None 

95) -> str: 

96 """Return the host for the given WSGI environment. 

97 

98 The ``Host`` header is preferred, then ``SERVER_NAME`` if it's not 

99 set. The returned host will only contain the port if it is different 

100 than the standard port for the protocol. 

101 

102 Optionally, verify that the host is trusted using 

103 :func:`host_is_trusted` and raise a 

104 :exc:`~werkzeug.exceptions.SecurityError` if it is not. 

105 

106 :param environ: A WSGI environment dict. 

107 :param trusted_hosts: A list of trusted host names. 

108 

109 :return: Host, with port if necessary. 

110 :raise ~werkzeug.exceptions.SecurityError: If the host is not 

111 trusted. 

112 """ 

113 return _sansio_utils.get_host( 

114 environ["wsgi.url_scheme"], 

115 environ.get("HTTP_HOST"), 

116 _get_server(environ), 

117 trusted_hosts, 

118 ) 

119 

120 

121def get_content_length(environ: "WSGIEnvironment") -> t.Optional[int]: 

122 """Returns the content length from the WSGI environment as 

123 integer. If it's not available or chunked transfer encoding is used, 

124 ``None`` is returned. 

125 

126 .. versionadded:: 0.9 

127 

128 :param environ: the WSGI environ to fetch the content length from. 

129 """ 

130 return _sansio_utils.get_content_length( 

131 http_content_length=environ.get("CONTENT_LENGTH"), 

132 http_transfer_encoding=environ.get("HTTP_TRANSFER_ENCODING", ""), 

133 ) 

134 

135 

136def get_input_stream( 

137 environ: "WSGIEnvironment", safe_fallback: bool = True 

138) -> t.IO[bytes]: 

139 """Returns the input stream from the WSGI environment and wraps it 

140 in the most sensible way possible. The stream returned is not the 

141 raw WSGI stream in most cases but one that is safe to read from 

142 without taking into account the content length. 

143 

144 If content length is not set, the stream will be empty for safety reasons. 

145 If the WSGI server supports chunked or infinite streams, it should set 

146 the ``wsgi.input_terminated`` value in the WSGI environ to indicate that. 

147 

148 .. versionadded:: 0.9 

149 

150 :param environ: the WSGI environ to fetch the stream from. 

151 :param safe_fallback: use an empty stream as a safe fallback when the 

152 content length is not set. Disabling this allows infinite streams, 

153 which can be a denial-of-service risk. 

154 """ 

155 stream = t.cast(t.IO[bytes], environ["wsgi.input"]) 

156 content_length = get_content_length(environ) 

157 

158 # A wsgi extension that tells us if the input is terminated. In 

159 # that case we return the stream unchanged as we know we can safely 

160 # read it until the end. 

161 if environ.get("wsgi.input_terminated"): 

162 return stream 

163 

164 # If the request doesn't specify a content length, returning the stream is 

165 # potentially dangerous because it could be infinite, malicious or not. If 

166 # safe_fallback is true, return an empty stream instead for safety. 

167 if content_length is None: 

168 return io.BytesIO() if safe_fallback else stream 

169 

170 # Otherwise limit the stream to the content length 

171 return t.cast(t.IO[bytes], LimitedStream(stream, content_length)) 

172 

173 

174def get_query_string(environ: "WSGIEnvironment") -> str: 

175 """Returns the ``QUERY_STRING`` from the WSGI environment. This also 

176 takes care of the WSGI decoding dance. The string returned will be 

177 restricted to ASCII characters. 

178 

179 :param environ: WSGI environment to get the query string from. 

180 

181 .. deprecated:: 2.2 

182 Will be removed in Werkzeug 2.3. 

183 

184 .. versionadded:: 0.9 

185 """ 

186 warnings.warn( 

187 "'get_query_string' is deprecated and will be removed in Werkzeug 2.3.", 

188 DeprecationWarning, 

189 stacklevel=2, 

190 ) 

191 qs = environ.get("QUERY_STRING", "").encode("latin1") 

192 # QUERY_STRING really should be ascii safe but some browsers 

193 # will send us some unicode stuff (I am looking at you IE). 

194 # In that case we want to urllib quote it badly. 

195 return url_quote(qs, safe=":&%=+$!*'(),") 

196 

197 

198def get_path_info( 

199 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace" 

200) -> str: 

201 """Return the ``PATH_INFO`` from the WSGI environment and decode it 

202 unless ``charset`` is ``None``. 

203 

204 :param environ: WSGI environment to get the path from. 

205 :param charset: The charset for the path info, or ``None`` if no 

206 decoding should be performed. 

207 :param errors: The decoding error handling. 

208 

209 .. versionadded:: 0.9 

210 """ 

211 path = environ.get("PATH_INFO", "").encode("latin1") 

212 return _to_str(path, charset, errors, allow_none_charset=True) # type: ignore 

213 

214 

215def get_script_name( 

216 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace" 

217) -> str: 

218 """Return the ``SCRIPT_NAME`` from the WSGI environment and decode 

219 it unless `charset` is set to ``None``. 

220 

221 :param environ: WSGI environment to get the path from. 

222 :param charset: The charset for the path, or ``None`` if no decoding 

223 should be performed. 

224 :param errors: The decoding error handling. 

225 

226 .. deprecated:: 2.2 

227 Will be removed in Werkzeug 2.3. 

228 

229 .. versionadded:: 0.9 

230 """ 

231 warnings.warn( 

232 "'get_script_name' is deprecated and will be removed in Werkzeug 2.3.", 

233 DeprecationWarning, 

234 stacklevel=2, 

235 ) 

236 path = environ.get("SCRIPT_NAME", "").encode("latin1") 

237 return _to_str(path, charset, errors, allow_none_charset=True) # type: ignore 

238 

239 

240def pop_path_info( 

241 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace" 

242) -> t.Optional[str]: 

243 """Removes and returns the next segment of `PATH_INFO`, pushing it onto 

244 `SCRIPT_NAME`. Returns `None` if there is nothing left on `PATH_INFO`. 

245 

246 If the `charset` is set to `None` bytes are returned. 

247 

248 If there are empty segments (``'/foo//bar``) these are ignored but 

249 properly pushed to the `SCRIPT_NAME`: 

250 

251 >>> env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b'} 

252 >>> pop_path_info(env) 

253 'a' 

254 >>> env['SCRIPT_NAME'] 

255 '/foo/a' 

256 >>> pop_path_info(env) 

257 'b' 

258 >>> env['SCRIPT_NAME'] 

259 '/foo/a/b' 

260 

261 .. deprecated:: 2.2 

262 Will be removed in Werkzeug 2.3. 

263 

264 .. versionadded:: 0.5 

265 

266 .. versionchanged:: 0.9 

267 The path is now decoded and a charset and encoding 

268 parameter can be provided. 

269 

270 :param environ: the WSGI environment that is modified. 

271 :param charset: The ``encoding`` parameter passed to 

272 :func:`bytes.decode`. 

273 :param errors: The ``errors`` paramater passed to 

274 :func:`bytes.decode`. 

275 """ 

276 warnings.warn( 

277 "'pop_path_info' is deprecated and will be removed in Werkzeug 2.3.", 

278 DeprecationWarning, 

279 stacklevel=2, 

280 ) 

281 

282 path = environ.get("PATH_INFO") 

283 if not path: 

284 return None 

285 

286 script_name = environ.get("SCRIPT_NAME", "") 

287 

288 # shift multiple leading slashes over 

289 old_path = path 

290 path = path.lstrip("/") 

291 if path != old_path: 

292 script_name += "/" * (len(old_path) - len(path)) 

293 

294 if "/" not in path: 

295 environ["PATH_INFO"] = "" 

296 environ["SCRIPT_NAME"] = script_name + path 

297 rv = path.encode("latin1") 

298 else: 

299 segment, path = path.split("/", 1) 

300 environ["PATH_INFO"] = f"/{path}" 

301 environ["SCRIPT_NAME"] = script_name + segment 

302 rv = segment.encode("latin1") 

303 

304 return _to_str(rv, charset, errors, allow_none_charset=True) # type: ignore 

305 

306 

307def peek_path_info( 

308 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace" 

309) -> t.Optional[str]: 

310 """Returns the next segment on the `PATH_INFO` or `None` if there 

311 is none. Works like :func:`pop_path_info` without modifying the 

312 environment: 

313 

314 >>> env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b'} 

315 >>> peek_path_info(env) 

316 'a' 

317 >>> peek_path_info(env) 

318 'a' 

319 

320 If the `charset` is set to `None` bytes are returned. 

321 

322 .. deprecated:: 2.2 

323 Will be removed in Werkzeug 2.3. 

324 

325 .. versionadded:: 0.5 

326 

327 .. versionchanged:: 0.9 

328 The path is now decoded and a charset and encoding 

329 parameter can be provided. 

330 

331 :param environ: the WSGI environment that is checked. 

332 """ 

333 warnings.warn( 

334 "'peek_path_info' is deprecated and will be removed in Werkzeug 2.3.", 

335 DeprecationWarning, 

336 stacklevel=2, 

337 ) 

338 

339 segments = environ.get("PATH_INFO", "").lstrip("/").split("/", 1) 

340 if segments: 

341 return _to_str( # type: ignore 

342 segments[0].encode("latin1"), charset, errors, allow_none_charset=True 

343 ) 

344 return None 

345 

346 

347def extract_path_info( 

348 environ_or_baseurl: t.Union[str, "WSGIEnvironment"], 

349 path_or_url: t.Union[str, _URLTuple], 

350 charset: str = "utf-8", 

351 errors: str = "werkzeug.url_quote", 

352 collapse_http_schemes: bool = True, 

353) -> t.Optional[str]: 

354 """Extracts the path info from the given URL (or WSGI environment) and 

355 path. The path info returned is a string. The URLs might also be IRIs. 

356 

357 If the path info could not be determined, `None` is returned. 

358 

359 Some examples: 

360 

361 >>> extract_path_info('http://example.com/app', '/app/hello') 

362 '/hello' 

363 >>> extract_path_info('http://example.com/app', 

364 ... 'https://example.com/app/hello') 

365 '/hello' 

366 >>> extract_path_info('http://example.com/app', 

367 ... 'https://example.com/app/hello', 

368 ... collapse_http_schemes=False) is None 

369 True 

370 

371 Instead of providing a base URL you can also pass a WSGI environment. 

372 

373 :param environ_or_baseurl: a WSGI environment dict, a base URL or 

374 base IRI. This is the root of the 

375 application. 

376 :param path_or_url: an absolute path from the server root, a 

377 relative path (in which case it's the path info) 

378 or a full URL. 

379 :param charset: the charset for byte data in URLs 

380 :param errors: the error handling on decode 

381 :param collapse_http_schemes: if set to `False` the algorithm does 

382 not assume that http and https on the 

383 same server point to the same 

384 resource. 

385 

386 .. deprecated:: 2.2 

387 Will be removed in Werkzeug 2.3. 

388 

389 .. versionchanged:: 0.15 

390 The ``errors`` parameter defaults to leaving invalid bytes 

391 quoted instead of replacing them. 

392 

393 .. versionadded:: 0.6 

394 

395 """ 

396 warnings.warn( 

397 "'extract_path_info' is deprecated and will be removed in Werkzeug 2.3.", 

398 DeprecationWarning, 

399 stacklevel=2, 

400 ) 

401 

402 def _normalize_netloc(scheme: str, netloc: str) -> str: 

403 parts = netloc.split("@", 1)[-1].split(":", 1) 

404 port: t.Optional[str] 

405 

406 if len(parts) == 2: 

407 netloc, port = parts 

408 if (scheme == "http" and port == "80") or ( 

409 scheme == "https" and port == "443" 

410 ): 

411 port = None 

412 else: 

413 netloc = parts[0] 

414 port = None 

415 

416 if port is not None: 

417 netloc += f":{port}" 

418 

419 return netloc 

420 

421 # make sure whatever we are working on is a IRI and parse it 

422 path = uri_to_iri(path_or_url, charset, errors) 

423 if isinstance(environ_or_baseurl, dict): 

424 environ_or_baseurl = get_current_url(environ_or_baseurl, root_only=True) 

425 base_iri = uri_to_iri(environ_or_baseurl, charset, errors) 

426 base_scheme, base_netloc, base_path = url_parse(base_iri)[:3] 

427 cur_scheme, cur_netloc, cur_path = url_parse(url_join(base_iri, path))[:3] 

428 

429 # normalize the network location 

430 base_netloc = _normalize_netloc(base_scheme, base_netloc) 

431 cur_netloc = _normalize_netloc(cur_scheme, cur_netloc) 

432 

433 # is that IRI even on a known HTTP scheme? 

434 if collapse_http_schemes: 

435 for scheme in base_scheme, cur_scheme: 

436 if scheme not in ("http", "https"): 

437 return None 

438 else: 

439 if not (base_scheme in ("http", "https") and base_scheme == cur_scheme): 

440 return None 

441 

442 # are the netlocs compatible? 

443 if base_netloc != cur_netloc: 

444 return None 

445 

446 # are we below the application path? 

447 base_path = base_path.rstrip("/") 

448 if not cur_path.startswith(base_path): 

449 return None 

450 

451 return f"/{cur_path[len(base_path) :].lstrip('/')}" 

452 

453 

454class ClosingIterator: 

455 """The WSGI specification requires that all middlewares and gateways 

456 respect the `close` callback of the iterable returned by the application. 

457 Because it is useful to add another close action to a returned iterable 

458 and adding a custom iterable is a boring task this class can be used for 

459 that:: 

460 

461 return ClosingIterator(app(environ, start_response), [cleanup_session, 

462 cleanup_locals]) 

463 

464 If there is just one close function it can be passed instead of the list. 

465 

466 A closing iterator is not needed if the application uses response objects 

467 and finishes the processing if the response is started:: 

468 

469 try: 

470 return response(environ, start_response) 

471 finally: 

472 cleanup_session() 

473 cleanup_locals() 

474 """ 

475 

476 def __init__( 

477 self, 

478 iterable: t.Iterable[bytes], 

479 callbacks: t.Optional[ 

480 t.Union[t.Callable[[], None], t.Iterable[t.Callable[[], None]]] 

481 ] = None, 

482 ) -> None: 

483 iterator = iter(iterable) 

484 self._next = t.cast(t.Callable[[], bytes], partial(next, iterator)) 

485 if callbacks is None: 

486 callbacks = [] 

487 elif callable(callbacks): 

488 callbacks = [callbacks] 

489 else: 

490 callbacks = list(callbacks) 

491 iterable_close = getattr(iterable, "close", None) 

492 if iterable_close: 

493 callbacks.insert(0, iterable_close) 

494 self._callbacks = callbacks 

495 

496 def __iter__(self) -> "ClosingIterator": 

497 return self 

498 

499 def __next__(self) -> bytes: 

500 return self._next() 

501 

502 def close(self) -> None: 

503 for callback in self._callbacks: 

504 callback() 

505 

506 

507def wrap_file( 

508 environ: "WSGIEnvironment", file: t.IO[bytes], buffer_size: int = 8192 

509) -> t.Iterable[bytes]: 

510 """Wraps a file. This uses the WSGI server's file wrapper if available 

511 or otherwise the generic :class:`FileWrapper`. 

512 

513 .. versionadded:: 0.5 

514 

515 If the file wrapper from the WSGI server is used it's important to not 

516 iterate over it from inside the application but to pass it through 

517 unchanged. If you want to pass out a file wrapper inside a response 

518 object you have to set :attr:`Response.direct_passthrough` to `True`. 

519 

520 More information about file wrappers are available in :pep:`333`. 

521 

522 :param file: a :class:`file`-like object with a :meth:`~file.read` method. 

523 :param buffer_size: number of bytes for one iteration. 

524 """ 

525 return environ.get("wsgi.file_wrapper", FileWrapper)( # type: ignore 

526 file, buffer_size 

527 ) 

528 

529 

530class FileWrapper: 

531 """This class can be used to convert a :class:`file`-like object into 

532 an iterable. It yields `buffer_size` blocks until the file is fully 

533 read. 

534 

535 You should not use this class directly but rather use the 

536 :func:`wrap_file` function that uses the WSGI server's file wrapper 

537 support if it's available. 

538 

539 .. versionadded:: 0.5 

540 

541 If you're using this object together with a :class:`Response` you have 

542 to use the `direct_passthrough` mode. 

543 

544 :param file: a :class:`file`-like object with a :meth:`~file.read` method. 

545 :param buffer_size: number of bytes for one iteration. 

546 """ 

547 

548 def __init__(self, file: t.IO[bytes], buffer_size: int = 8192) -> None: 

549 self.file = file 

550 self.buffer_size = buffer_size 

551 

552 def close(self) -> None: 

553 if hasattr(self.file, "close"): 

554 self.file.close() 

555 

556 def seekable(self) -> bool: 

557 if hasattr(self.file, "seekable"): 

558 return self.file.seekable() 

559 if hasattr(self.file, "seek"): 

560 return True 

561 return False 

562 

563 def seek(self, *args: t.Any) -> None: 

564 if hasattr(self.file, "seek"): 

565 self.file.seek(*args) 

566 

567 def tell(self) -> t.Optional[int]: 

568 if hasattr(self.file, "tell"): 

569 return self.file.tell() 

570 return None 

571 

572 def __iter__(self) -> "FileWrapper": 

573 return self 

574 

575 def __next__(self) -> bytes: 

576 data = self.file.read(self.buffer_size) 

577 if data: 

578 return data 

579 raise StopIteration() 

580 

581 

582class _RangeWrapper: 

583 # private for now, but should we make it public in the future ? 

584 

585 """This class can be used to convert an iterable object into 

586 an iterable that will only yield a piece of the underlying content. 

587 It yields blocks until the underlying stream range is fully read. 

588 The yielded blocks will have a size that can't exceed the original 

589 iterator defined block size, but that can be smaller. 

590 

591 If you're using this object together with a :class:`Response` you have 

592 to use the `direct_passthrough` mode. 

593 

594 :param iterable: an iterable object with a :meth:`__next__` method. 

595 :param start_byte: byte from which read will start. 

596 :param byte_range: how many bytes to read. 

597 """ 

598 

599 def __init__( 

600 self, 

601 iterable: t.Union[t.Iterable[bytes], t.IO[bytes]], 

602 start_byte: int = 0, 

603 byte_range: t.Optional[int] = None, 

604 ): 

605 self.iterable = iter(iterable) 

606 self.byte_range = byte_range 

607 self.start_byte = start_byte 

608 self.end_byte = None 

609 

610 if byte_range is not None: 

611 self.end_byte = start_byte + byte_range 

612 

613 self.read_length = 0 

614 self.seekable = ( 

615 hasattr(iterable, "seekable") and iterable.seekable() # type: ignore 

616 ) 

617 self.end_reached = False 

618 

619 def __iter__(self) -> "_RangeWrapper": 

620 return self 

621 

622 def _next_chunk(self) -> bytes: 

623 try: 

624 chunk = next(self.iterable) 

625 self.read_length += len(chunk) 

626 return chunk 

627 except StopIteration: 

628 self.end_reached = True 

629 raise 

630 

631 def _first_iteration(self) -> t.Tuple[t.Optional[bytes], int]: 

632 chunk = None 

633 if self.seekable: 

634 self.iterable.seek(self.start_byte) # type: ignore 

635 self.read_length = self.iterable.tell() # type: ignore 

636 contextual_read_length = self.read_length 

637 else: 

638 while self.read_length <= self.start_byte: 

639 chunk = self._next_chunk() 

640 if chunk is not None: 

641 chunk = chunk[self.start_byte - self.read_length :] 

642 contextual_read_length = self.start_byte 

643 return chunk, contextual_read_length 

644 

645 def _next(self) -> bytes: 

646 if self.end_reached: 

647 raise StopIteration() 

648 chunk = None 

649 contextual_read_length = self.read_length 

650 if self.read_length == 0: 

651 chunk, contextual_read_length = self._first_iteration() 

652 if chunk is None: 

653 chunk = self._next_chunk() 

654 if self.end_byte is not None and self.read_length >= self.end_byte: 

655 self.end_reached = True 

656 return chunk[: self.end_byte - contextual_read_length] 

657 return chunk 

658 

659 def __next__(self) -> bytes: 

660 chunk = self._next() 

661 if chunk: 

662 return chunk 

663 self.end_reached = True 

664 raise StopIteration() 

665 

666 def close(self) -> None: 

667 if hasattr(self.iterable, "close"): 

668 self.iterable.close() # type: ignore 

669 

670 

671def _make_chunk_iter( 

672 stream: t.Union[t.Iterable[bytes], t.IO[bytes]], 

673 limit: t.Optional[int], 

674 buffer_size: int, 

675) -> t.Iterator[bytes]: 

676 """Helper for the line and chunk iter functions.""" 

677 if isinstance(stream, (bytes, bytearray, str)): 

678 raise TypeError( 

679 "Passed a string or byte object instead of true iterator or stream." 

680 ) 

681 if not hasattr(stream, "read"): 

682 for item in stream: 

683 if item: 

684 yield item 

685 return 

686 stream = t.cast(t.IO[bytes], stream) 

687 if not isinstance(stream, LimitedStream) and limit is not None: 

688 stream = t.cast(t.IO[bytes], LimitedStream(stream, limit)) 

689 _read = stream.read 

690 while True: 

691 item = _read(buffer_size) 

692 if not item: 

693 break 

694 yield item 

695 

696 

697def make_line_iter( 

698 stream: t.Union[t.Iterable[bytes], t.IO[bytes]], 

699 limit: t.Optional[int] = None, 

700 buffer_size: int = 10 * 1024, 

701 cap_at_buffer: bool = False, 

702) -> t.Iterator[bytes]: 

703 """Safely iterates line-based over an input stream. If the input stream 

704 is not a :class:`LimitedStream` the `limit` parameter is mandatory. 

705 

706 This uses the stream's :meth:`~file.read` method internally as opposite 

707 to the :meth:`~file.readline` method that is unsafe and can only be used 

708 in violation of the WSGI specification. The same problem applies to the 

709 `__iter__` function of the input stream which calls :meth:`~file.readline` 

710 without arguments. 

711 

712 If you need line-by-line processing it's strongly recommended to iterate 

713 over the input stream using this helper function. 

714 

715 .. versionchanged:: 0.8 

716 This function now ensures that the limit was reached. 

717 

718 .. versionadded:: 0.9 

719 added support for iterators as input stream. 

720 

721 .. versionadded:: 0.11.10 

722 added support for the `cap_at_buffer` parameter. 

723 

724 :param stream: the stream or iterate to iterate over. 

725 :param limit: the limit in bytes for the stream. (Usually 

726 content length. Not necessary if the `stream` 

727 is a :class:`LimitedStream`. 

728 :param buffer_size: The optional buffer size. 

729 :param cap_at_buffer: if this is set chunks are split if they are longer 

730 than the buffer size. Internally this is implemented 

731 that the buffer size might be exhausted by a factor 

732 of two however. 

733 """ 

734 _iter = _make_chunk_iter(stream, limit, buffer_size) 

735 

736 first_item = next(_iter, "") 

737 if not first_item: 

738 return 

739 

740 s = _make_encode_wrapper(first_item) 

741 empty = t.cast(bytes, s("")) 

742 cr = t.cast(bytes, s("\r")) 

743 lf = t.cast(bytes, s("\n")) 

744 crlf = t.cast(bytes, s("\r\n")) 

745 

746 _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter)) 

747 

748 def _iter_basic_lines() -> t.Iterator[bytes]: 

749 _join = empty.join 

750 buffer: t.List[bytes] = [] 

751 while True: 

752 new_data = next(_iter, "") 

753 if not new_data: 

754 break 

755 new_buf: t.List[bytes] = [] 

756 buf_size = 0 

757 for item in t.cast( 

758 t.Iterator[bytes], chain(buffer, new_data.splitlines(True)) 

759 ): 

760 new_buf.append(item) 

761 buf_size += len(item) 

762 if item and item[-1:] in crlf: 

763 yield _join(new_buf) 

764 new_buf = [] 

765 elif cap_at_buffer and buf_size >= buffer_size: 

766 rv = _join(new_buf) 

767 while len(rv) >= buffer_size: 

768 yield rv[:buffer_size] 

769 rv = rv[buffer_size:] 

770 new_buf = [rv] 

771 buffer = new_buf 

772 if buffer: 

773 yield _join(buffer) 

774 

775 # This hackery is necessary to merge 'foo\r' and '\n' into one item 

776 # of 'foo\r\n' if we were unlucky and we hit a chunk boundary. 

777 previous = empty 

778 for item in _iter_basic_lines(): 

779 if item == lf and previous[-1:] == cr: 

780 previous += item 

781 item = empty 

782 if previous: 

783 yield previous 

784 previous = item 

785 if previous: 

786 yield previous 

787 

788 

789def make_chunk_iter( 

790 stream: t.Union[t.Iterable[bytes], t.IO[bytes]], 

791 separator: bytes, 

792 limit: t.Optional[int] = None, 

793 buffer_size: int = 10 * 1024, 

794 cap_at_buffer: bool = False, 

795) -> t.Iterator[bytes]: 

796 """Works like :func:`make_line_iter` but accepts a separator 

797 which divides chunks. If you want newline based processing 

798 you should use :func:`make_line_iter` instead as it 

799 supports arbitrary newline markers. 

800 

801 .. versionadded:: 0.8 

802 

803 .. versionadded:: 0.9 

804 added support for iterators as input stream. 

805 

806 .. versionadded:: 0.11.10 

807 added support for the `cap_at_buffer` parameter. 

808 

809 :param stream: the stream or iterate to iterate over. 

810 :param separator: the separator that divides chunks. 

811 :param limit: the limit in bytes for the stream. (Usually 

812 content length. Not necessary if the `stream` 

813 is otherwise already limited). 

814 :param buffer_size: The optional buffer size. 

815 :param cap_at_buffer: if this is set chunks are split if they are longer 

816 than the buffer size. Internally this is implemented 

817 that the buffer size might be exhausted by a factor 

818 of two however. 

819 """ 

820 _iter = _make_chunk_iter(stream, limit, buffer_size) 

821 

822 first_item = next(_iter, b"") 

823 if not first_item: 

824 return 

825 

826 _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter)) 

827 if isinstance(first_item, str): 

828 separator = _to_str(separator) 

829 _split = re.compile(f"({re.escape(separator)})").split 

830 _join = "".join 

831 else: 

832 separator = _to_bytes(separator) 

833 _split = re.compile(b"(" + re.escape(separator) + b")").split 

834 _join = b"".join 

835 

836 buffer: t.List[bytes] = [] 

837 while True: 

838 new_data = next(_iter, b"") 

839 if not new_data: 

840 break 

841 chunks = _split(new_data) 

842 new_buf: t.List[bytes] = [] 

843 buf_size = 0 

844 for item in chain(buffer, chunks): 

845 if item == separator: 

846 yield _join(new_buf) 

847 new_buf = [] 

848 buf_size = 0 

849 else: 

850 buf_size += len(item) 

851 new_buf.append(item) 

852 

853 if cap_at_buffer and buf_size >= buffer_size: 

854 rv = _join(new_buf) 

855 while len(rv) >= buffer_size: 

856 yield rv[:buffer_size] 

857 rv = rv[buffer_size:] 

858 new_buf = [rv] 

859 buf_size = len(rv) 

860 

861 buffer = new_buf 

862 if buffer: 

863 yield _join(buffer) 

864 

865 

866class LimitedStream(io.IOBase): 

867 """Wraps a stream so that it doesn't read more than n bytes. If the 

868 stream is exhausted and the caller tries to get more bytes from it 

869 :func:`on_exhausted` is called which by default returns an empty 

870 string. The return value of that function is forwarded 

871 to the reader function. So if it returns an empty string 

872 :meth:`read` will return an empty string as well. 

873 

874 The limit however must never be higher than what the stream can 

875 output. Otherwise :meth:`readlines` will try to read past the 

876 limit. 

877 

878 .. admonition:: Note on WSGI compliance 

879 

880 calls to :meth:`readline` and :meth:`readlines` are not 

881 WSGI compliant because it passes a size argument to the 

882 readline methods. Unfortunately the WSGI PEP is not safely 

883 implementable without a size argument to :meth:`readline` 

884 because there is no EOF marker in the stream. As a result 

885 of that the use of :meth:`readline` is discouraged. 

886 

887 For the same reason iterating over the :class:`LimitedStream` 

888 is not portable. It internally calls :meth:`readline`. 

889 

890 We strongly suggest using :meth:`read` only or using the 

891 :func:`make_line_iter` which safely iterates line-based 

892 over a WSGI input stream. 

893 

894 :param stream: the stream to wrap. 

895 :param limit: the limit for the stream, must not be longer than 

896 what the string can provide if the stream does not 

897 end with `EOF` (like `wsgi.input`) 

898 """ 

899 

900 def __init__(self, stream: t.IO[bytes], limit: int) -> None: 

901 self._read = stream.read 

902 self._readline = stream.readline 

903 self._pos = 0 

904 self.limit = limit 

905 

906 def __iter__(self) -> "LimitedStream": 

907 return self 

908 

909 @property 

910 def is_exhausted(self) -> bool: 

911 """If the stream is exhausted this attribute is `True`.""" 

912 return self._pos >= self.limit 

913 

914 def on_exhausted(self) -> bytes: 

915 """This is called when the stream tries to read past the limit. 

916 The return value of this function is returned from the reading 

917 function. 

918 """ 

919 # Read null bytes from the stream so that we get the 

920 # correct end of stream marker. 

921 return self._read(0) 

922 

923 def on_disconnect(self) -> bytes: 

924 """What should happen if a disconnect is detected? The return 

925 value of this function is returned from read functions in case 

926 the client went away. By default a 

927 :exc:`~werkzeug.exceptions.ClientDisconnected` exception is raised. 

928 """ 

929 from .exceptions import ClientDisconnected 

930 

931 raise ClientDisconnected() 

932 

933 def exhaust(self, chunk_size: int = 1024 * 64) -> None: 

934 """Exhaust the stream. This consumes all the data left until the 

935 limit is reached. 

936 

937 :param chunk_size: the size for a chunk. It will read the chunk 

938 until the stream is exhausted and throw away 

939 the results. 

940 """ 

941 to_read = self.limit - self._pos 

942 chunk = chunk_size 

943 while to_read > 0: 

944 chunk = min(to_read, chunk) 

945 self.read(chunk) 

946 to_read -= chunk 

947 

948 def read(self, size: t.Optional[int] = None) -> bytes: 

949 """Read `size` bytes or if size is not provided everything is read. 

950 

951 :param size: the number of bytes read. 

952 """ 

953 if self._pos >= self.limit: 

954 return self.on_exhausted() 

955 if size is None or size == -1: # -1 is for consistence with file 

956 size = self.limit 

957 to_read = min(self.limit - self._pos, size) 

958 try: 

959 read = self._read(to_read) 

960 except (OSError, ValueError): 

961 return self.on_disconnect() 

962 if to_read and len(read) != to_read: 

963 return self.on_disconnect() 

964 self._pos += len(read) 

965 return read 

966 

967 def readline(self, size: t.Optional[int] = None) -> bytes: 

968 """Reads one line from the stream.""" 

969 if self._pos >= self.limit: 

970 return self.on_exhausted() 

971 if size is None: 

972 size = self.limit - self._pos 

973 else: 

974 size = min(size, self.limit - self._pos) 

975 try: 

976 line = self._readline(size) 

977 except (ValueError, OSError): 

978 return self.on_disconnect() 

979 if size and not line: 

980 return self.on_disconnect() 

981 self._pos += len(line) 

982 return line 

983 

984 def readlines(self, size: t.Optional[int] = None) -> t.List[bytes]: 

985 """Reads a file into a list of strings. It calls :meth:`readline` 

986 until the file is read to the end. It does support the optional 

987 `size` argument if the underlying stream supports it for 

988 `readline`. 

989 """ 

990 last_pos = self._pos 

991 result = [] 

992 if size is not None: 

993 end = min(self.limit, last_pos + size) 

994 else: 

995 end = self.limit 

996 while True: 

997 if size is not None: 

998 size -= last_pos - self._pos 

999 if self._pos >= end: 

1000 break 

1001 result.append(self.readline(size)) 

1002 if size is not None: 

1003 last_pos = self._pos 

1004 return result 

1005 

1006 def tell(self) -> int: 

1007 """Returns the position of the stream. 

1008 

1009 .. versionadded:: 0.9 

1010 """ 

1011 return self._pos 

1012 

1013 def __next__(self) -> bytes: 

1014 line = self.readline() 

1015 if not line: 

1016 raise StopIteration() 

1017 return line 

1018 

1019 def readable(self) -> bool: 

1020 return True