Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/wsgi.py: 17%

401 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1import io 

2import re 

3import typing as t 

4import warnings 

5from functools import partial 

6from functools import update_wrapper 

7from itertools import chain 

8 

9from ._internal import _make_encode_wrapper 

10from ._internal import _to_bytes 

11from ._internal import _to_str 

12from .sansio import utils as _sansio_utils 

13from .sansio.utils import host_is_trusted # noqa: F401 # Imported as part of API 

14from .urls import _URLTuple 

15from .urls import uri_to_iri 

16from .urls import url_join 

17from .urls import url_parse 

18from .urls import url_quote 

19 

20if t.TYPE_CHECKING: 

21 from _typeshed.wsgi import WSGIApplication 

22 from _typeshed.wsgi import WSGIEnvironment 

23 

24 

25def responder(f: t.Callable[..., "WSGIApplication"]) -> "WSGIApplication": 

26 """Marks a function as responder. Decorate a function with it and it 

27 will automatically call the return value as WSGI application. 

28 

29 Example:: 

30 

31 @responder 

32 def application(environ, start_response): 

33 return Response('Hello World!') 

34 """ 

35 return update_wrapper(lambda *a: f(*a)(*a[-2:]), f) 

36 

37 

38def get_current_url( 

39 environ: "WSGIEnvironment", 

40 root_only: bool = False, 

41 strip_querystring: bool = False, 

42 host_only: bool = False, 

43 trusted_hosts: t.Optional[t.Iterable[str]] = None, 

44) -> str: 

45 """Recreate the URL for a request from the parts in a WSGI 

46 environment. 

47 

48 The URL is an IRI, not a URI, so it may contain Unicode characters. 

49 Use :func:`~werkzeug.urls.iri_to_uri` to convert it to ASCII. 

50 

51 :param environ: The WSGI environment to get the URL parts from. 

52 :param root_only: Only build the root path, don't include the 

53 remaining path or query string. 

54 :param strip_querystring: Don't include the query string. 

55 :param host_only: Only build the scheme and host. 

56 :param trusted_hosts: A list of trusted host names to validate the 

57 host against. 

58 """ 

59 parts = { 

60 "scheme": environ["wsgi.url_scheme"], 

61 "host": get_host(environ, trusted_hosts), 

62 } 

63 

64 if not host_only: 

65 parts["root_path"] = environ.get("SCRIPT_NAME", "") 

66 

67 if not root_only: 

68 parts["path"] = environ.get("PATH_INFO", "") 

69 

70 if not strip_querystring: 

71 parts["query_string"] = environ.get("QUERY_STRING", "").encode("latin1") 

72 

73 return _sansio_utils.get_current_url(**parts) 

74 

75 

76def _get_server( 

77 environ: "WSGIEnvironment", 

78) -> t.Optional[t.Tuple[str, t.Optional[int]]]: 

79 name = environ.get("SERVER_NAME") 

80 

81 if name is None: 

82 return None 

83 

84 try: 

85 port: t.Optional[int] = int(environ.get("SERVER_PORT", None)) 

86 except (TypeError, ValueError): 

87 # unix socket 

88 port = None 

89 

90 return name, port 

91 

92 

93def get_host( 

94 environ: "WSGIEnvironment", trusted_hosts: t.Optional[t.Iterable[str]] = None 

95) -> str: 

96 """Return the host for the given WSGI environment. 

97 

98 The ``Host`` header is preferred, then ``SERVER_NAME`` if it's not 

99 set. The returned host will only contain the port if it is different 

100 than the standard port for the protocol. 

101 

102 Optionally, verify that the host is trusted using 

103 :func:`host_is_trusted` and raise a 

104 :exc:`~werkzeug.exceptions.SecurityError` if it is not. 

105 

106 :param environ: A WSGI environment dict. 

107 :param trusted_hosts: A list of trusted host names. 

108 

109 :return: Host, with port if necessary. 

110 :raise ~werkzeug.exceptions.SecurityError: If the host is not 

111 trusted. 

112 """ 

113 return _sansio_utils.get_host( 

114 environ["wsgi.url_scheme"], 

115 environ.get("HTTP_HOST"), 

116 _get_server(environ), 

117 trusted_hosts, 

118 ) 

119 

120 

121def get_content_length(environ: "WSGIEnvironment") -> t.Optional[int]: 

122 """Returns the content length from the WSGI environment as 

123 integer. If it's not available or chunked transfer encoding is used, 

124 ``None`` is returned. 

125 

126 .. versionadded:: 0.9 

127 

128 :param environ: the WSGI environ to fetch the content length from. 

129 """ 

130 return _sansio_utils.get_content_length( 

131 http_content_length=environ.get("CONTENT_LENGTH"), 

132 http_transfer_encoding=environ.get("HTTP_TRANSFER_ENCODING", ""), 

133 ) 

134 

135 

136def get_input_stream( 

137 environ: "WSGIEnvironment", safe_fallback: bool = True 

138) -> t.IO[bytes]: 

139 """Returns the input stream from the WSGI environment and wraps it 

140 in the most sensible way possible. The stream returned is not the 

141 raw WSGI stream in most cases but one that is safe to read from 

142 without taking into account the content length. 

143 

144 If content length is not set, the stream will be empty for safety reasons. 

145 If the WSGI server supports chunked or infinite streams, it should set 

146 the ``wsgi.input_terminated`` value in the WSGI environ to indicate that. 

147 

148 .. versionadded:: 0.9 

149 

150 :param environ: the WSGI environ to fetch the stream from. 

151 :param safe_fallback: use an empty stream as a safe fallback when the 

152 content length is not set. Disabling this allows infinite streams, 

153 which can be a denial-of-service risk. 

154 """ 

155 stream = t.cast(t.IO[bytes], environ["wsgi.input"]) 

156 content_length = get_content_length(environ) 

157 

158 # A wsgi extension that tells us if the input is terminated. In 

159 # that case we return the stream unchanged as we know we can safely 

160 # read it until the end. 

161 if environ.get("wsgi.input_terminated"): 

162 return stream 

163 

164 # If the request doesn't specify a content length, returning the stream is 

165 # potentially dangerous because it could be infinite, malicious or not. If 

166 # safe_fallback is true, return an empty stream instead for safety. 

167 if content_length is None: 

168 return io.BytesIO() if safe_fallback else stream 

169 

170 # Otherwise limit the stream to the content length 

171 return t.cast(t.IO[bytes], LimitedStream(stream, content_length)) 

172 

173 

174def get_query_string(environ: "WSGIEnvironment") -> str: 

175 """Returns the ``QUERY_STRING`` from the WSGI environment. This also 

176 takes care of the WSGI decoding dance. The string returned will be 

177 restricted to ASCII characters. 

178 

179 :param environ: WSGI environment to get the query string from. 

180 

181 .. deprecated:: 2.2 

182 Will be removed in Werkzeug 2.3. 

183 

184 .. versionadded:: 0.9 

185 """ 

186 warnings.warn( 

187 "'get_query_string' is deprecated and will be removed in Werkzeug 2.3.", 

188 DeprecationWarning, 

189 stacklevel=2, 

190 ) 

191 qs = environ.get("QUERY_STRING", "").encode("latin1") 

192 # QUERY_STRING really should be ascii safe but some browsers 

193 # will send us some unicode stuff (I am looking at you IE). 

194 # In that case we want to urllib quote it badly. 

195 return url_quote(qs, safe=":&%=+$!*'(),") 

196 

197 

198def get_path_info( 

199 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace" 

200) -> str: 

201 """Return the ``PATH_INFO`` from the WSGI environment and decode it 

202 unless ``charset`` is ``None``. 

203 

204 :param environ: WSGI environment to get the path from. 

205 :param charset: The charset for the path info, or ``None`` if no 

206 decoding should be performed. 

207 :param errors: The decoding error handling. 

208 

209 .. versionadded:: 0.9 

210 """ 

211 path = environ.get("PATH_INFO", "").encode("latin1") 

212 return _to_str(path, charset, errors, allow_none_charset=True) # type: ignore 

213 

214 

215def get_script_name( 

216 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace" 

217) -> str: 

218 """Return the ``SCRIPT_NAME`` from the WSGI environment and decode 

219 it unless `charset` is set to ``None``. 

220 

221 :param environ: WSGI environment to get the path from. 

222 :param charset: The charset for the path, or ``None`` if no decoding 

223 should be performed. 

224 :param errors: The decoding error handling. 

225 

226 .. deprecated:: 2.2 

227 Will be removed in Werkzeug 2.3. 

228 

229 .. versionadded:: 0.9 

230 """ 

231 warnings.warn( 

232 "'get_script_name' is deprecated and will be removed in Werkzeug 2.3.", 

233 DeprecationWarning, 

234 stacklevel=2, 

235 ) 

236 path = environ.get("SCRIPT_NAME", "").encode("latin1") 

237 return _to_str(path, charset, errors, allow_none_charset=True) # type: ignore 

238 

239 

240def pop_path_info( 

241 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace" 

242) -> t.Optional[str]: 

243 """Removes and returns the next segment of `PATH_INFO`, pushing it onto 

244 `SCRIPT_NAME`. Returns `None` if there is nothing left on `PATH_INFO`. 

245 

246 If the `charset` is set to `None` bytes are returned. 

247 

248 If there are empty segments (``'/foo//bar``) these are ignored but 

249 properly pushed to the `SCRIPT_NAME`: 

250 

251 >>> env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b'} 

252 >>> pop_path_info(env) 

253 'a' 

254 >>> env['SCRIPT_NAME'] 

255 '/foo/a' 

256 >>> pop_path_info(env) 

257 'b' 

258 >>> env['SCRIPT_NAME'] 

259 '/foo/a/b' 

260 

261 .. deprecated:: 2.2 

262 Will be removed in Werkzeug 2.3. 

263 

264 .. versionadded:: 0.5 

265 

266 .. versionchanged:: 0.9 

267 The path is now decoded and a charset and encoding 

268 parameter can be provided. 

269 

270 :param environ: the WSGI environment that is modified. 

271 :param charset: The ``encoding`` parameter passed to 

272 :func:`bytes.decode`. 

273 :param errors: The ``errors`` paramater passed to 

274 :func:`bytes.decode`. 

275 """ 

276 warnings.warn( 

277 "'pop_path_info' is deprecated and will be removed in Werkzeug 2.3.", 

278 DeprecationWarning, 

279 stacklevel=2, 

280 ) 

281 

282 path = environ.get("PATH_INFO") 

283 if not path: 

284 return None 

285 

286 script_name = environ.get("SCRIPT_NAME", "") 

287 

288 # shift multiple leading slashes over 

289 old_path = path 

290 path = path.lstrip("/") 

291 if path != old_path: 

292 script_name += "/" * (len(old_path) - len(path)) 

293 

294 if "/" not in path: 

295 environ["PATH_INFO"] = "" 

296 environ["SCRIPT_NAME"] = script_name + path 

297 rv = path.encode("latin1") 

298 else: 

299 segment, path = path.split("/", 1) 

300 environ["PATH_INFO"] = f"/{path}" 

301 environ["SCRIPT_NAME"] = script_name + segment 

302 rv = segment.encode("latin1") 

303 

304 return _to_str(rv, charset, errors, allow_none_charset=True) # type: ignore 

305 

306 

307def peek_path_info( 

308 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace" 

309) -> t.Optional[str]: 

310 """Returns the next segment on the `PATH_INFO` or `None` if there 

311 is none. Works like :func:`pop_path_info` without modifying the 

312 environment: 

313 

314 >>> env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b'} 

315 >>> peek_path_info(env) 

316 'a' 

317 >>> peek_path_info(env) 

318 'a' 

319 

320 If the `charset` is set to `None` bytes are returned. 

321 

322 .. deprecated:: 2.2 

323 Will be removed in Werkzeug 2.3. 

324 

325 .. versionadded:: 0.5 

326 

327 .. versionchanged:: 0.9 

328 The path is now decoded and a charset and encoding 

329 parameter can be provided. 

330 

331 :param environ: the WSGI environment that is checked. 

332 """ 

333 warnings.warn( 

334 "'peek_path_info' is deprecated and will be removed in Werkzeug 2.3.", 

335 DeprecationWarning, 

336 stacklevel=2, 

337 ) 

338 

339 segments = environ.get("PATH_INFO", "").lstrip("/").split("/", 1) 

340 if segments: 

341 return _to_str( # type: ignore 

342 segments[0].encode("latin1"), charset, errors, allow_none_charset=True 

343 ) 

344 return None 

345 

346 

347def extract_path_info( 

348 environ_or_baseurl: t.Union[str, "WSGIEnvironment"], 

349 path_or_url: t.Union[str, _URLTuple], 

350 charset: str = "utf-8", 

351 errors: str = "werkzeug.url_quote", 

352 collapse_http_schemes: bool = True, 

353) -> t.Optional[str]: 

354 """Extracts the path info from the given URL (or WSGI environment) and 

355 path. The path info returned is a string. The URLs might also be IRIs. 

356 

357 If the path info could not be determined, `None` is returned. 

358 

359 Some examples: 

360 

361 >>> extract_path_info('http://example.com/app', '/app/hello') 

362 '/hello' 

363 >>> extract_path_info('http://example.com/app', 

364 ... 'https://example.com/app/hello') 

365 '/hello' 

366 >>> extract_path_info('http://example.com/app', 

367 ... 'https://example.com/app/hello', 

368 ... collapse_http_schemes=False) is None 

369 True 

370 

371 Instead of providing a base URL you can also pass a WSGI environment. 

372 

373 :param environ_or_baseurl: a WSGI environment dict, a base URL or 

374 base IRI. This is the root of the 

375 application. 

376 :param path_or_url: an absolute path from the server root, a 

377 relative path (in which case it's the path info) 

378 or a full URL. 

379 :param charset: the charset for byte data in URLs 

380 :param errors: the error handling on decode 

381 :param collapse_http_schemes: if set to `False` the algorithm does 

382 not assume that http and https on the 

383 same server point to the same 

384 resource. 

385 

386 .. deprecated:: 2.2 

387 Will be removed in Werkzeug 2.3. 

388 

389 .. versionchanged:: 0.15 

390 The ``errors`` parameter defaults to leaving invalid bytes 

391 quoted instead of replacing them. 

392 

393 .. versionadded:: 0.6 

394 

395 """ 

396 warnings.warn( 

397 "'extract_path_info' is deprecated and will be removed in Werkzeug 2.3.", 

398 DeprecationWarning, 

399 stacklevel=2, 

400 ) 

401 

402 def _normalize_netloc(scheme: str, netloc: str) -> str: 

403 parts = netloc.split("@", 1)[-1].split(":", 1) 

404 port: t.Optional[str] 

405 

406 if len(parts) == 2: 

407 netloc, port = parts 

408 if (scheme == "http" and port == "80") or ( 

409 scheme == "https" and port == "443" 

410 ): 

411 port = None 

412 else: 

413 netloc = parts[0] 

414 port = None 

415 

416 if port is not None: 

417 netloc += f":{port}" 

418 

419 return netloc 

420 

421 # make sure whatever we are working on is a IRI and parse it 

422 path = uri_to_iri(path_or_url, charset, errors) 

423 if isinstance(environ_or_baseurl, dict): 

424 environ_or_baseurl = get_current_url(environ_or_baseurl, root_only=True) 

425 base_iri = uri_to_iri(environ_or_baseurl, charset, errors) 

426 base_scheme, base_netloc, base_path = url_parse(base_iri)[:3] 

427 cur_scheme, cur_netloc, cur_path = url_parse(url_join(base_iri, path))[:3] 

428 

429 # normalize the network location 

430 base_netloc = _normalize_netloc(base_scheme, base_netloc) 

431 cur_netloc = _normalize_netloc(cur_scheme, cur_netloc) 

432 

433 # is that IRI even on a known HTTP scheme? 

434 if collapse_http_schemes: 

435 for scheme in base_scheme, cur_scheme: 

436 if scheme not in ("http", "https"): 

437 return None 

438 else: 

439 if not (base_scheme in ("http", "https") and base_scheme == cur_scheme): 

440 return None 

441 

442 # are the netlocs compatible? 

443 if base_netloc != cur_netloc: 

444 return None 

445 

446 # are we below the application path? 

447 base_path = base_path.rstrip("/") 

448 if not cur_path.startswith(base_path): 

449 return None 

450 

451 return f"/{cur_path[len(base_path) :].lstrip('/')}" 

452 

453 

454class ClosingIterator: 

455 """The WSGI specification requires that all middlewares and gateways 

456 respect the `close` callback of the iterable returned by the application. 

457 Because it is useful to add another close action to a returned iterable 

458 and adding a custom iterable is a boring task this class can be used for 

459 that:: 

460 

461 return ClosingIterator(app(environ, start_response), [cleanup_session, 

462 cleanup_locals]) 

463 

464 If there is just one close function it can be passed instead of the list. 

465 

466 A closing iterator is not needed if the application uses response objects 

467 and finishes the processing if the response is started:: 

468 

469 try: 

470 return response(environ, start_response) 

471 finally: 

472 cleanup_session() 

473 cleanup_locals() 

474 """ 

475 

476 def __init__( 

477 self, 

478 iterable: t.Iterable[bytes], 

479 callbacks: t.Optional[ 

480 t.Union[t.Callable[[], None], t.Iterable[t.Callable[[], None]]] 

481 ] = None, 

482 ) -> None: 

483 iterator = iter(iterable) 

484 self._next = t.cast(t.Callable[[], bytes], partial(next, iterator)) 

485 if callbacks is None: 

486 callbacks = [] 

487 elif callable(callbacks): 

488 callbacks = [callbacks] 

489 else: 

490 callbacks = list(callbacks) 

491 iterable_close = getattr(iterable, "close", None) 

492 if iterable_close: 

493 callbacks.insert(0, iterable_close) 

494 self._callbacks = callbacks 

495 

496 def __iter__(self) -> "ClosingIterator": 

497 return self 

498 

499 def __next__(self) -> bytes: 

500 return self._next() 

501 

502 def close(self) -> None: 

503 for callback in self._callbacks: 

504 callback() 

505 

506 

507def wrap_file( 

508 environ: "WSGIEnvironment", file: t.IO[bytes], buffer_size: int = 8192 

509) -> t.Iterable[bytes]: 

510 """Wraps a file. This uses the WSGI server's file wrapper if available 

511 or otherwise the generic :class:`FileWrapper`. 

512 

513 .. versionadded:: 0.5 

514 

515 If the file wrapper from the WSGI server is used it's important to not 

516 iterate over it from inside the application but to pass it through 

517 unchanged. If you want to pass out a file wrapper inside a response 

518 object you have to set :attr:`Response.direct_passthrough` to `True`. 

519 

520 More information about file wrappers are available in :pep:`333`. 

521 

522 :param file: a :class:`file`-like object with a :meth:`~file.read` method. 

523 :param buffer_size: number of bytes for one iteration. 

524 """ 

525 return environ.get("wsgi.file_wrapper", FileWrapper)( # type: ignore 

526 file, buffer_size 

527 ) 

528 

529 

530class FileWrapper: 

531 """This class can be used to convert a :class:`file`-like object into 

532 an iterable. It yields `buffer_size` blocks until the file is fully 

533 read. 

534 

535 You should not use this class directly but rather use the 

536 :func:`wrap_file` function that uses the WSGI server's file wrapper 

537 support if it's available. 

538 

539 .. versionadded:: 0.5 

540 

541 If you're using this object together with a :class:`Response` you have 

542 to use the `direct_passthrough` mode. 

543 

544 :param file: a :class:`file`-like object with a :meth:`~file.read` method. 

545 :param buffer_size: number of bytes for one iteration. 

546 """ 

547 

548 def __init__(self, file: t.IO[bytes], buffer_size: int = 8192) -> None: 

549 self.file = file 

550 self.buffer_size = buffer_size 

551 

552 def close(self) -> None: 

553 if hasattr(self.file, "close"): 

554 self.file.close() 

555 

556 def seekable(self) -> bool: 

557 if hasattr(self.file, "seekable"): 

558 return self.file.seekable() 

559 if hasattr(self.file, "seek"): 

560 return True 

561 return False 

562 

563 def seek(self, *args: t.Any) -> None: 

564 if hasattr(self.file, "seek"): 

565 self.file.seek(*args) 

566 

567 def tell(self) -> t.Optional[int]: 

568 if hasattr(self.file, "tell"): 

569 return self.file.tell() 

570 return None 

571 

572 def __iter__(self) -> "FileWrapper": 

573 return self 

574 

575 def __next__(self) -> bytes: 

576 data = self.file.read(self.buffer_size) 

577 if data: 

578 return data 

579 raise StopIteration() 

580 

581 

582class _RangeWrapper: 

583 # private for now, but should we make it public in the future ? 

584 

585 """This class can be used to convert an iterable object into 

586 an iterable that will only yield a piece of the underlying content. 

587 It yields blocks until the underlying stream range is fully read. 

588 The yielded blocks will have a size that can't exceed the original 

589 iterator defined block size, but that can be smaller. 

590 

591 If you're using this object together with a :class:`Response` you have 

592 to use the `direct_passthrough` mode. 

593 

594 :param iterable: an iterable object with a :meth:`__next__` method. 

595 :param start_byte: byte from which read will start. 

596 :param byte_range: how many bytes to read. 

597 """ 

598 

599 def __init__( 

600 self, 

601 iterable: t.Union[t.Iterable[bytes], t.IO[bytes]], 

602 start_byte: int = 0, 

603 byte_range: t.Optional[int] = None, 

604 ): 

605 self.iterable = iter(iterable) 

606 self.byte_range = byte_range 

607 self.start_byte = start_byte 

608 self.end_byte = None 

609 

610 if byte_range is not None: 

611 self.end_byte = start_byte + byte_range 

612 

613 self.read_length = 0 

614 self.seekable = hasattr(iterable, "seekable") and iterable.seekable() 

615 self.end_reached = False 

616 

617 def __iter__(self) -> "_RangeWrapper": 

618 return self 

619 

620 def _next_chunk(self) -> bytes: 

621 try: 

622 chunk = next(self.iterable) 

623 self.read_length += len(chunk) 

624 return chunk 

625 except StopIteration: 

626 self.end_reached = True 

627 raise 

628 

629 def _first_iteration(self) -> t.Tuple[t.Optional[bytes], int]: 

630 chunk = None 

631 if self.seekable: 

632 self.iterable.seek(self.start_byte) # type: ignore 

633 self.read_length = self.iterable.tell() # type: ignore 

634 contextual_read_length = self.read_length 

635 else: 

636 while self.read_length <= self.start_byte: 

637 chunk = self._next_chunk() 

638 if chunk is not None: 

639 chunk = chunk[self.start_byte - self.read_length :] 

640 contextual_read_length = self.start_byte 

641 return chunk, contextual_read_length 

642 

643 def _next(self) -> bytes: 

644 if self.end_reached: 

645 raise StopIteration() 

646 chunk = None 

647 contextual_read_length = self.read_length 

648 if self.read_length == 0: 

649 chunk, contextual_read_length = self._first_iteration() 

650 if chunk is None: 

651 chunk = self._next_chunk() 

652 if self.end_byte is not None and self.read_length >= self.end_byte: 

653 self.end_reached = True 

654 return chunk[: self.end_byte - contextual_read_length] 

655 return chunk 

656 

657 def __next__(self) -> bytes: 

658 chunk = self._next() 

659 if chunk: 

660 return chunk 

661 self.end_reached = True 

662 raise StopIteration() 

663 

664 def close(self) -> None: 

665 if hasattr(self.iterable, "close"): 

666 self.iterable.close() 

667 

668 

669def _make_chunk_iter( 

670 stream: t.Union[t.Iterable[bytes], t.IO[bytes]], 

671 limit: t.Optional[int], 

672 buffer_size: int, 

673) -> t.Iterator[bytes]: 

674 """Helper for the line and chunk iter functions.""" 

675 if isinstance(stream, (bytes, bytearray, str)): 

676 raise TypeError( 

677 "Passed a string or byte object instead of true iterator or stream." 

678 ) 

679 if not hasattr(stream, "read"): 

680 for item in stream: 

681 if item: 

682 yield item 

683 return 

684 stream = t.cast(t.IO[bytes], stream) 

685 if not isinstance(stream, LimitedStream) and limit is not None: 

686 stream = t.cast(t.IO[bytes], LimitedStream(stream, limit)) 

687 _read = stream.read 

688 while True: 

689 item = _read(buffer_size) 

690 if not item: 

691 break 

692 yield item 

693 

694 

695def make_line_iter( 

696 stream: t.Union[t.Iterable[bytes], t.IO[bytes]], 

697 limit: t.Optional[int] = None, 

698 buffer_size: int = 10 * 1024, 

699 cap_at_buffer: bool = False, 

700) -> t.Iterator[bytes]: 

701 """Safely iterates line-based over an input stream. If the input stream 

702 is not a :class:`LimitedStream` the `limit` parameter is mandatory. 

703 

704 This uses the stream's :meth:`~file.read` method internally as opposite 

705 to the :meth:`~file.readline` method that is unsafe and can only be used 

706 in violation of the WSGI specification. The same problem applies to the 

707 `__iter__` function of the input stream which calls :meth:`~file.readline` 

708 without arguments. 

709 

710 If you need line-by-line processing it's strongly recommended to iterate 

711 over the input stream using this helper function. 

712 

713 .. versionchanged:: 0.8 

714 This function now ensures that the limit was reached. 

715 

716 .. versionadded:: 0.9 

717 added support for iterators as input stream. 

718 

719 .. versionadded:: 0.11.10 

720 added support for the `cap_at_buffer` parameter. 

721 

722 :param stream: the stream or iterate to iterate over. 

723 :param limit: the limit in bytes for the stream. (Usually 

724 content length. Not necessary if the `stream` 

725 is a :class:`LimitedStream`. 

726 :param buffer_size: The optional buffer size. 

727 :param cap_at_buffer: if this is set chunks are split if they are longer 

728 than the buffer size. Internally this is implemented 

729 that the buffer size might be exhausted by a factor 

730 of two however. 

731 """ 

732 _iter = _make_chunk_iter(stream, limit, buffer_size) 

733 

734 first_item = next(_iter, "") 

735 if not first_item: 

736 return 

737 

738 s = _make_encode_wrapper(first_item) 

739 empty = t.cast(bytes, s("")) 

740 cr = t.cast(bytes, s("\r")) 

741 lf = t.cast(bytes, s("\n")) 

742 crlf = t.cast(bytes, s("\r\n")) 

743 

744 _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter)) 

745 

746 def _iter_basic_lines() -> t.Iterator[bytes]: 

747 _join = empty.join 

748 buffer: t.List[bytes] = [] 

749 while True: 

750 new_data = next(_iter, "") 

751 if not new_data: 

752 break 

753 new_buf: t.List[bytes] = [] 

754 buf_size = 0 

755 for item in t.cast( 

756 t.Iterator[bytes], chain(buffer, new_data.splitlines(True)) 

757 ): 

758 new_buf.append(item) 

759 buf_size += len(item) 

760 if item and item[-1:] in crlf: 

761 yield _join(new_buf) 

762 new_buf = [] 

763 elif cap_at_buffer and buf_size >= buffer_size: 

764 rv = _join(new_buf) 

765 while len(rv) >= buffer_size: 

766 yield rv[:buffer_size] 

767 rv = rv[buffer_size:] 

768 new_buf = [rv] 

769 buffer = new_buf 

770 if buffer: 

771 yield _join(buffer) 

772 

773 # This hackery is necessary to merge 'foo\r' and '\n' into one item 

774 # of 'foo\r\n' if we were unlucky and we hit a chunk boundary. 

775 previous = empty 

776 for item in _iter_basic_lines(): 

777 if item == lf and previous[-1:] == cr: 

778 previous += item 

779 item = empty 

780 if previous: 

781 yield previous 

782 previous = item 

783 if previous: 

784 yield previous 

785 

786 

787def make_chunk_iter( 

788 stream: t.Union[t.Iterable[bytes], t.IO[bytes]], 

789 separator: bytes, 

790 limit: t.Optional[int] = None, 

791 buffer_size: int = 10 * 1024, 

792 cap_at_buffer: bool = False, 

793) -> t.Iterator[bytes]: 

794 """Works like :func:`make_line_iter` but accepts a separator 

795 which divides chunks. If you want newline based processing 

796 you should use :func:`make_line_iter` instead as it 

797 supports arbitrary newline markers. 

798 

799 .. versionadded:: 0.8 

800 

801 .. versionadded:: 0.9 

802 added support for iterators as input stream. 

803 

804 .. versionadded:: 0.11.10 

805 added support for the `cap_at_buffer` parameter. 

806 

807 :param stream: the stream or iterate to iterate over. 

808 :param separator: the separator that divides chunks. 

809 :param limit: the limit in bytes for the stream. (Usually 

810 content length. Not necessary if the `stream` 

811 is otherwise already limited). 

812 :param buffer_size: The optional buffer size. 

813 :param cap_at_buffer: if this is set chunks are split if they are longer 

814 than the buffer size. Internally this is implemented 

815 that the buffer size might be exhausted by a factor 

816 of two however. 

817 """ 

818 _iter = _make_chunk_iter(stream, limit, buffer_size) 

819 

820 first_item = next(_iter, b"") 

821 if not first_item: 

822 return 

823 

824 _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter)) 

825 if isinstance(first_item, str): 

826 separator = _to_str(separator) 

827 _split = re.compile(f"({re.escape(separator)})").split 

828 _join = "".join 

829 else: 

830 separator = _to_bytes(separator) 

831 _split = re.compile(b"(" + re.escape(separator) + b")").split 

832 _join = b"".join 

833 

834 buffer: t.List[bytes] = [] 

835 while True: 

836 new_data = next(_iter, b"") 

837 if not new_data: 

838 break 

839 chunks = _split(new_data) 

840 new_buf: t.List[bytes] = [] 

841 buf_size = 0 

842 for item in chain(buffer, chunks): 

843 if item == separator: 

844 yield _join(new_buf) 

845 new_buf = [] 

846 buf_size = 0 

847 else: 

848 buf_size += len(item) 

849 new_buf.append(item) 

850 

851 if cap_at_buffer and buf_size >= buffer_size: 

852 rv = _join(new_buf) 

853 while len(rv) >= buffer_size: 

854 yield rv[:buffer_size] 

855 rv = rv[buffer_size:] 

856 new_buf = [rv] 

857 buf_size = len(rv) 

858 

859 buffer = new_buf 

860 if buffer: 

861 yield _join(buffer) 

862 

863 

864class LimitedStream(io.IOBase): 

865 """Wraps a stream so that it doesn't read more than n bytes. If the 

866 stream is exhausted and the caller tries to get more bytes from it 

867 :func:`on_exhausted` is called which by default returns an empty 

868 string. The return value of that function is forwarded 

869 to the reader function. So if it returns an empty string 

870 :meth:`read` will return an empty string as well. 

871 

872 The limit however must never be higher than what the stream can 

873 output. Otherwise :meth:`readlines` will try to read past the 

874 limit. 

875 

876 .. admonition:: Note on WSGI compliance 

877 

878 calls to :meth:`readline` and :meth:`readlines` are not 

879 WSGI compliant because it passes a size argument to the 

880 readline methods. Unfortunately the WSGI PEP is not safely 

881 implementable without a size argument to :meth:`readline` 

882 because there is no EOF marker in the stream. As a result 

883 of that the use of :meth:`readline` is discouraged. 

884 

885 For the same reason iterating over the :class:`LimitedStream` 

886 is not portable. It internally calls :meth:`readline`. 

887 

888 We strongly suggest using :meth:`read` only or using the 

889 :func:`make_line_iter` which safely iterates line-based 

890 over a WSGI input stream. 

891 

892 :param stream: the stream to wrap. 

893 :param limit: the limit for the stream, must not be longer than 

894 what the string can provide if the stream does not 

895 end with `EOF` (like `wsgi.input`) 

896 """ 

897 

898 def __init__(self, stream: t.IO[bytes], limit: int) -> None: 

899 self._read = stream.read 

900 self._readline = stream.readline 

901 self._pos = 0 

902 self.limit = limit 

903 

904 def __iter__(self) -> "LimitedStream": 

905 return self 

906 

907 @property 

908 def is_exhausted(self) -> bool: 

909 """If the stream is exhausted this attribute is `True`.""" 

910 return self._pos >= self.limit 

911 

912 def on_exhausted(self) -> bytes: 

913 """This is called when the stream tries to read past the limit. 

914 The return value of this function is returned from the reading 

915 function. 

916 """ 

917 # Read null bytes from the stream so that we get the 

918 # correct end of stream marker. 

919 return self._read(0) 

920 

921 def on_disconnect(self) -> bytes: 

922 """What should happen if a disconnect is detected? The return 

923 value of this function is returned from read functions in case 

924 the client went away. By default a 

925 :exc:`~werkzeug.exceptions.ClientDisconnected` exception is raised. 

926 """ 

927 from .exceptions import ClientDisconnected 

928 

929 raise ClientDisconnected() 

930 

931 def _exhaust_chunks(self, chunk_size: int = 1024 * 64) -> t.Iterator[bytes]: 

932 """Exhaust the stream by reading until the limit is reached or the client 

933 disconnects, yielding each chunk. 

934 

935 :param chunk_size: How many bytes to read at a time. 

936 

937 :meta private: 

938 

939 .. versionadded:: 2.2.3 

940 """ 

941 to_read = self.limit - self._pos 

942 

943 while to_read > 0: 

944 chunk = self.read(min(to_read, chunk_size)) 

945 yield chunk 

946 to_read -= len(chunk) 

947 

948 def exhaust(self, chunk_size: int = 1024 * 64) -> None: 

949 """Exhaust the stream by reading until the limit is reached or the client 

950 disconnects, discarding the data. 

951 

952 :param chunk_size: How many bytes to read at a time. 

953 

954 .. versionchanged:: 2.2.3 

955 Handle case where wrapped stream returns fewer bytes than requested. 

956 """ 

957 for _ in self._exhaust_chunks(chunk_size): 

958 pass 

959 

960 def read(self, size: t.Optional[int] = None) -> bytes: 

961 """Read up to ``size`` bytes from the underlying stream. If size is not 

962 provided, read until the limit. 

963 

964 If the limit is reached, :meth:`on_exhausted` is called, which returns empty 

965 bytes. 

966 

967 If no bytes are read and the limit is not reached, or if an error occurs during 

968 the read, :meth:`on_disconnect` is called, which raises 

969 :exc:`.ClientDisconnected`. 

970 

971 :param size: The number of bytes to read. ``None``, default, reads until the 

972 limit is reached. 

973 

974 .. versionchanged:: 2.2.3 

975 Handle case where wrapped stream returns fewer bytes than requested. 

976 """ 

977 if self._pos >= self.limit: 

978 return self.on_exhausted() 

979 

980 if size is None or size == -1: # -1 is for consistency with file 

981 # Keep reading from the wrapped stream until the limit is reached. Can't 

982 # rely on stream.read(size) because it's not guaranteed to return size. 

983 buf = bytearray() 

984 

985 for chunk in self._exhaust_chunks(): 

986 buf.extend(chunk) 

987 

988 return bytes(buf) 

989 

990 to_read = min(self.limit - self._pos, size) 

991 

992 try: 

993 read = self._read(to_read) 

994 except (OSError, ValueError): 

995 return self.on_disconnect() 

996 

997 if to_read and not len(read): 

998 # If no data was read, treat it as a disconnect. As long as some data was 

999 # read, a subsequent call can still return more before reaching the limit. 

1000 return self.on_disconnect() 

1001 

1002 self._pos += len(read) 

1003 return read 

1004 

1005 def readline(self, size: t.Optional[int] = None) -> bytes: 

1006 """Reads one line from the stream.""" 

1007 if self._pos >= self.limit: 

1008 return self.on_exhausted() 

1009 if size is None: 

1010 size = self.limit - self._pos 

1011 else: 

1012 size = min(size, self.limit - self._pos) 

1013 try: 

1014 line = self._readline(size) 

1015 except (ValueError, OSError): 

1016 return self.on_disconnect() 

1017 if size and not line: 

1018 return self.on_disconnect() 

1019 self._pos += len(line) 

1020 return line 

1021 

1022 def readlines(self, size: t.Optional[int] = None) -> t.List[bytes]: 

1023 """Reads a file into a list of strings. It calls :meth:`readline` 

1024 until the file is read to the end. It does support the optional 

1025 `size` argument if the underlying stream supports it for 

1026 `readline`. 

1027 """ 

1028 last_pos = self._pos 

1029 result = [] 

1030 if size is not None: 

1031 end = min(self.limit, last_pos + size) 

1032 else: 

1033 end = self.limit 

1034 while True: 

1035 if size is not None: 

1036 size -= last_pos - self._pos 

1037 if self._pos >= end: 

1038 break 

1039 result.append(self.readline(size)) 

1040 if size is not None: 

1041 last_pos = self._pos 

1042 return result 

1043 

1044 def tell(self) -> int: 

1045 """Returns the position of the stream. 

1046 

1047 .. versionadded:: 0.9 

1048 """ 

1049 return self._pos 

1050 

1051 def __next__(self) -> bytes: 

1052 line = self.readline() 

1053 if not line: 

1054 raise StopIteration() 

1055 return line 

1056 

1057 def readable(self) -> bool: 

1058 return True