Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/wsgi.py: 35%

337 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-09 06:08 +0000

1from __future__ import annotations 

2 

3import io 

4import re 

5import typing as t 

6import warnings 

7from functools import partial 

8from functools import update_wrapper 

9from itertools import chain 

10 

11from ._internal import _make_encode_wrapper 

12from ._internal import _to_bytes 

13from ._internal import _to_str 

14from .exceptions import ClientDisconnected 

15from .exceptions import RequestEntityTooLarge 

16from .sansio import utils as _sansio_utils 

17from .sansio.utils import host_is_trusted # noqa: F401 # Imported as part of API 

18 

19if t.TYPE_CHECKING: 

20 from _typeshed.wsgi import WSGIApplication 

21 from _typeshed.wsgi import WSGIEnvironment 

22 

23 

24def responder(f: t.Callable[..., WSGIApplication]) -> WSGIApplication: 

25 """Marks a function as responder. Decorate a function with it and it 

26 will automatically call the return value as WSGI application. 

27 

28 Example:: 

29 

30 @responder 

31 def application(environ, start_response): 

32 return Response('Hello World!') 

33 """ 

34 return update_wrapper(lambda *a: f(*a)(*a[-2:]), f) 

35 

36 

37def get_current_url( 

38 environ: WSGIEnvironment, 

39 root_only: bool = False, 

40 strip_querystring: bool = False, 

41 host_only: bool = False, 

42 trusted_hosts: t.Iterable[str] | None = None, 

43) -> str: 

44 """Recreate the URL for a request from the parts in a WSGI 

45 environment. 

46 

47 The URL is an IRI, not a URI, so it may contain Unicode characters. 

48 Use :func:`~werkzeug.urls.iri_to_uri` to convert it to ASCII. 

49 

50 :param environ: The WSGI environment to get the URL parts from. 

51 :param root_only: Only build the root path, don't include the 

52 remaining path or query string. 

53 :param strip_querystring: Don't include the query string. 

54 :param host_only: Only build the scheme and host. 

55 :param trusted_hosts: A list of trusted host names to validate the 

56 host against. 

57 """ 

58 parts = { 

59 "scheme": environ["wsgi.url_scheme"], 

60 "host": get_host(environ, trusted_hosts), 

61 } 

62 

63 if not host_only: 

64 parts["root_path"] = environ.get("SCRIPT_NAME", "") 

65 

66 if not root_only: 

67 parts["path"] = environ.get("PATH_INFO", "") 

68 

69 if not strip_querystring: 

70 parts["query_string"] = environ.get("QUERY_STRING", "").encode("latin1") 

71 

72 return _sansio_utils.get_current_url(**parts) 

73 

74 

75def _get_server( 

76 environ: WSGIEnvironment, 

77) -> tuple[str, int | None] | None: 

78 name = environ.get("SERVER_NAME") 

79 

80 if name is None: 

81 return None 

82 

83 try: 

84 port: int | None = int(environ.get("SERVER_PORT", None)) 

85 except (TypeError, ValueError): 

86 # unix socket 

87 port = None 

88 

89 return name, port 

90 

91 

92def get_host( 

93 environ: WSGIEnvironment, trusted_hosts: t.Iterable[str] | None = None 

94) -> str: 

95 """Return the host for the given WSGI environment. 

96 

97 The ``Host`` header is preferred, then ``SERVER_NAME`` if it's not 

98 set. The returned host will only contain the port if it is different 

99 than the standard port for the protocol. 

100 

101 Optionally, verify that the host is trusted using 

102 :func:`host_is_trusted` and raise a 

103 :exc:`~werkzeug.exceptions.SecurityError` if it is not. 

104 

105 :param environ: A WSGI environment dict. 

106 :param trusted_hosts: A list of trusted host names. 

107 

108 :return: Host, with port if necessary. 

109 :raise ~werkzeug.exceptions.SecurityError: If the host is not 

110 trusted. 

111 """ 

112 return _sansio_utils.get_host( 

113 environ["wsgi.url_scheme"], 

114 environ.get("HTTP_HOST"), 

115 _get_server(environ), 

116 trusted_hosts, 

117 ) 

118 

119 

120def get_content_length(environ: WSGIEnvironment) -> int | None: 

121 """Return the ``Content-Length`` header value as an int. If the header is not given 

122 or the ``Transfer-Encoding`` header is ``chunked``, ``None`` is returned to indicate 

123 a streaming request. If the value is not an integer, or negative, 0 is returned. 

124 

125 :param environ: The WSGI environ to get the content length from. 

126 

127 .. versionadded:: 0.9 

128 """ 

129 return _sansio_utils.get_content_length( 

130 http_content_length=environ.get("CONTENT_LENGTH"), 

131 http_transfer_encoding=environ.get("HTTP_TRANSFER_ENCODING"), 

132 ) 

133 

134 

135def get_input_stream( 

136 environ: WSGIEnvironment, 

137 safe_fallback: bool = True, 

138 max_content_length: int | None = None, 

139) -> t.IO[bytes]: 

140 """Return the WSGI input stream, wrapped so that it may be read safely without going 

141 past the ``Content-Length`` header value or ``max_content_length``. 

142 

143 If ``Content-Length`` exceeds ``max_content_length``, a 

144 :exc:`RequestEntityTooLarge`` ``413 Content Too Large`` error is raised. 

145 

146 If the WSGI server sets ``environ["wsgi.input_terminated"]``, it indicates that the 

147 server handles terminating the stream, so it is safe to read directly. For example, 

148 a server that knows how to handle chunked requests safely would set this. 

149 

150 If ``max_content_length`` is set, it can be enforced on streams if 

151 ``wsgi.input_terminated`` is set. Otherwise, an empty stream is returned unless the 

152 user explicitly disables this safe fallback. 

153 

154 If the limit is reached before the underlying stream is exhausted (such as a file 

155 that is too large, or an infinite stream), the remaining contents of the stream 

156 cannot be read safely. Depending on how the server handles this, clients may show a 

157 "connection reset" failure instead of seeing the 413 response. 

158 

159 :param environ: The WSGI environ containing the stream. 

160 :param safe_fallback: Return an empty stream when ``Content-Length`` is not set. 

161 Disabling this allows infinite streams, which can be a denial-of-service risk. 

162 :param max_content_length: The maximum length that content-length or streaming 

163 requests may not exceed. 

164 

165 .. versionchanged:: 2.3.2 

166 ``max_content_length`` is only applied to streaming requests if the server sets 

167 ``wsgi.input_terminated``. 

168 

169 .. versionchanged:: 2.3 

170 Check ``max_content_length`` and raise an error if it is exceeded. 

171 

172 .. versionadded:: 0.9 

173 """ 

174 stream = t.cast(t.IO[bytes], environ["wsgi.input"]) 

175 content_length = get_content_length(environ) 

176 

177 if content_length is not None and max_content_length is not None: 

178 if content_length > max_content_length: 

179 raise RequestEntityTooLarge() 

180 

181 # A WSGI server can set this to indicate that it terminates the input stream. In 

182 # that case the stream is safe without wrapping, or can enforce a max length. 

183 if "wsgi.input_terminated" in environ: 

184 if max_content_length is not None: 

185 # If this is moved above, it can cause the stream to hang if a read attempt 

186 # is made when the client sends no data. For example, the development server 

187 # does not handle buffering except for chunked encoding. 

188 return t.cast( 

189 t.IO[bytes], LimitedStream(stream, max_content_length, is_max=True) 

190 ) 

191 

192 return stream 

193 

194 # No limit given, return an empty stream unless the user explicitly allows the 

195 # potentially infinite stream. An infinite stream is dangerous if it's not expected, 

196 # as it can tie up a worker indefinitely. 

197 if content_length is None: 

198 return io.BytesIO() if safe_fallback else stream 

199 

200 return t.cast(t.IO[bytes], LimitedStream(stream, content_length)) 

201 

202 

203def get_path_info( 

204 environ: WSGIEnvironment, 

205 charset: t.Any = ..., 

206 errors: str | None = None, 

207) -> str: 

208 """Return ``PATH_INFO`` from the WSGI environment. 

209 

210 :param environ: WSGI environment to get the path from. 

211 

212 .. versionchanged:: 2.3 

213 The ``charset`` and ``errors`` parameters are deprecated and will be removed in 

214 Werkzeug 3.0. 

215 

216 .. versionadded:: 0.9 

217 """ 

218 if charset is not ...: 

219 warnings.warn( 

220 "The 'charset' parameter is deprecated and will be removed" 

221 " in Werkzeug 3.0.", 

222 DeprecationWarning, 

223 stacklevel=2, 

224 ) 

225 

226 if charset is None: 

227 charset = "utf-8" 

228 else: 

229 charset = "utf-8" 

230 

231 if errors is not None: 

232 warnings.warn( 

233 "The 'errors' parameter is deprecated and will be removed in Werkzeug 3.0", 

234 DeprecationWarning, 

235 stacklevel=2, 

236 ) 

237 else: 

238 errors = "replace" 

239 

240 path = environ.get("PATH_INFO", "").encode("latin1") 

241 return path.decode(charset, errors) # type: ignore[no-any-return] 

242 

243 

244class ClosingIterator: 

245 """The WSGI specification requires that all middlewares and gateways 

246 respect the `close` callback of the iterable returned by the application. 

247 Because it is useful to add another close action to a returned iterable 

248 and adding a custom iterable is a boring task this class can be used for 

249 that:: 

250 

251 return ClosingIterator(app(environ, start_response), [cleanup_session, 

252 cleanup_locals]) 

253 

254 If there is just one close function it can be passed instead of the list. 

255 

256 A closing iterator is not needed if the application uses response objects 

257 and finishes the processing if the response is started:: 

258 

259 try: 

260 return response(environ, start_response) 

261 finally: 

262 cleanup_session() 

263 cleanup_locals() 

264 """ 

265 

266 def __init__( 

267 self, 

268 iterable: t.Iterable[bytes], 

269 callbacks: None 

270 | (t.Callable[[], None] | t.Iterable[t.Callable[[], None]]) = None, 

271 ) -> None: 

272 iterator = iter(iterable) 

273 self._next = t.cast(t.Callable[[], bytes], partial(next, iterator)) 

274 if callbacks is None: 

275 callbacks = [] 

276 elif callable(callbacks): 

277 callbacks = [callbacks] 

278 else: 

279 callbacks = list(callbacks) 

280 iterable_close = getattr(iterable, "close", None) 

281 if iterable_close: 

282 callbacks.insert(0, iterable_close) 

283 self._callbacks = callbacks 

284 

285 def __iter__(self) -> ClosingIterator: 

286 return self 

287 

288 def __next__(self) -> bytes: 

289 return self._next() 

290 

291 def close(self) -> None: 

292 for callback in self._callbacks: 

293 callback() 

294 

295 

296def wrap_file( 

297 environ: WSGIEnvironment, file: t.IO[bytes], buffer_size: int = 8192 

298) -> t.Iterable[bytes]: 

299 """Wraps a file. This uses the WSGI server's file wrapper if available 

300 or otherwise the generic :class:`FileWrapper`. 

301 

302 .. versionadded:: 0.5 

303 

304 If the file wrapper from the WSGI server is used it's important to not 

305 iterate over it from inside the application but to pass it through 

306 unchanged. If you want to pass out a file wrapper inside a response 

307 object you have to set :attr:`Response.direct_passthrough` to `True`. 

308 

309 More information about file wrappers are available in :pep:`333`. 

310 

311 :param file: a :class:`file`-like object with a :meth:`~file.read` method. 

312 :param buffer_size: number of bytes for one iteration. 

313 """ 

314 return environ.get("wsgi.file_wrapper", FileWrapper)( # type: ignore 

315 file, buffer_size 

316 ) 

317 

318 

319class FileWrapper: 

320 """This class can be used to convert a :class:`file`-like object into 

321 an iterable. It yields `buffer_size` blocks until the file is fully 

322 read. 

323 

324 You should not use this class directly but rather use the 

325 :func:`wrap_file` function that uses the WSGI server's file wrapper 

326 support if it's available. 

327 

328 .. versionadded:: 0.5 

329 

330 If you're using this object together with a :class:`Response` you have 

331 to use the `direct_passthrough` mode. 

332 

333 :param file: a :class:`file`-like object with a :meth:`~file.read` method. 

334 :param buffer_size: number of bytes for one iteration. 

335 """ 

336 

337 def __init__(self, file: t.IO[bytes], buffer_size: int = 8192) -> None: 

338 self.file = file 

339 self.buffer_size = buffer_size 

340 

341 def close(self) -> None: 

342 if hasattr(self.file, "close"): 

343 self.file.close() 

344 

345 def seekable(self) -> bool: 

346 if hasattr(self.file, "seekable"): 

347 return self.file.seekable() 

348 if hasattr(self.file, "seek"): 

349 return True 

350 return False 

351 

352 def seek(self, *args: t.Any) -> None: 

353 if hasattr(self.file, "seek"): 

354 self.file.seek(*args) 

355 

356 def tell(self) -> int | None: 

357 if hasattr(self.file, "tell"): 

358 return self.file.tell() 

359 return None 

360 

361 def __iter__(self) -> FileWrapper: 

362 return self 

363 

364 def __next__(self) -> bytes: 

365 data = self.file.read(self.buffer_size) 

366 if data: 

367 return data 

368 raise StopIteration() 

369 

370 

371class _RangeWrapper: 

372 # private for now, but should we make it public in the future ? 

373 

374 """This class can be used to convert an iterable object into 

375 an iterable that will only yield a piece of the underlying content. 

376 It yields blocks until the underlying stream range is fully read. 

377 The yielded blocks will have a size that can't exceed the original 

378 iterator defined block size, but that can be smaller. 

379 

380 If you're using this object together with a :class:`Response` you have 

381 to use the `direct_passthrough` mode. 

382 

383 :param iterable: an iterable object with a :meth:`__next__` method. 

384 :param start_byte: byte from which read will start. 

385 :param byte_range: how many bytes to read. 

386 """ 

387 

388 def __init__( 

389 self, 

390 iterable: t.Iterable[bytes] | t.IO[bytes], 

391 start_byte: int = 0, 

392 byte_range: int | None = None, 

393 ): 

394 self.iterable = iter(iterable) 

395 self.byte_range = byte_range 

396 self.start_byte = start_byte 

397 self.end_byte = None 

398 

399 if byte_range is not None: 

400 self.end_byte = start_byte + byte_range 

401 

402 self.read_length = 0 

403 self.seekable = hasattr(iterable, "seekable") and iterable.seekable() 

404 self.end_reached = False 

405 

406 def __iter__(self) -> _RangeWrapper: 

407 return self 

408 

409 def _next_chunk(self) -> bytes: 

410 try: 

411 chunk = next(self.iterable) 

412 self.read_length += len(chunk) 

413 return chunk 

414 except StopIteration: 

415 self.end_reached = True 

416 raise 

417 

418 def _first_iteration(self) -> tuple[bytes | None, int]: 

419 chunk = None 

420 if self.seekable: 

421 self.iterable.seek(self.start_byte) # type: ignore 

422 self.read_length = self.iterable.tell() # type: ignore 

423 contextual_read_length = self.read_length 

424 else: 

425 while self.read_length <= self.start_byte: 

426 chunk = self._next_chunk() 

427 if chunk is not None: 

428 chunk = chunk[self.start_byte - self.read_length :] 

429 contextual_read_length = self.start_byte 

430 return chunk, contextual_read_length 

431 

432 def _next(self) -> bytes: 

433 if self.end_reached: 

434 raise StopIteration() 

435 chunk = None 

436 contextual_read_length = self.read_length 

437 if self.read_length == 0: 

438 chunk, contextual_read_length = self._first_iteration() 

439 if chunk is None: 

440 chunk = self._next_chunk() 

441 if self.end_byte is not None and self.read_length >= self.end_byte: 

442 self.end_reached = True 

443 return chunk[: self.end_byte - contextual_read_length] 

444 return chunk 

445 

446 def __next__(self) -> bytes: 

447 chunk = self._next() 

448 if chunk: 

449 return chunk 

450 self.end_reached = True 

451 raise StopIteration() 

452 

453 def close(self) -> None: 

454 if hasattr(self.iterable, "close"): 

455 self.iterable.close() 

456 

457 

458def _make_chunk_iter( 

459 stream: t.Iterable[bytes] | t.IO[bytes], 

460 limit: int | None, 

461 buffer_size: int, 

462) -> t.Iterator[bytes]: 

463 """Helper for the line and chunk iter functions.""" 

464 warnings.warn( 

465 "'_make_chunk_iter' is deprecated and will be removed in Werkzeug 3.0.", 

466 DeprecationWarning, 

467 stacklevel=2, 

468 ) 

469 

470 if isinstance(stream, (bytes, bytearray, str)): 

471 raise TypeError( 

472 "Passed a string or byte object instead of true iterator or stream." 

473 ) 

474 if not hasattr(stream, "read"): 

475 for item in stream: 

476 if item: 

477 yield item 

478 return 

479 stream = t.cast(t.IO[bytes], stream) 

480 if not isinstance(stream, LimitedStream) and limit is not None: 

481 stream = t.cast(t.IO[bytes], LimitedStream(stream, limit)) 

482 _read = stream.read 

483 while True: 

484 item = _read(buffer_size) 

485 if not item: 

486 break 

487 yield item 

488 

489 

490def make_line_iter( 

491 stream: t.Iterable[bytes] | t.IO[bytes], 

492 limit: int | None = None, 

493 buffer_size: int = 10 * 1024, 

494 cap_at_buffer: bool = False, 

495) -> t.Iterator[bytes]: 

496 """Safely iterates line-based over an input stream. If the input stream 

497 is not a :class:`LimitedStream` the `limit` parameter is mandatory. 

498 

499 This uses the stream's :meth:`~file.read` method internally as opposite 

500 to the :meth:`~file.readline` method that is unsafe and can only be used 

501 in violation of the WSGI specification. The same problem applies to the 

502 `__iter__` function of the input stream which calls :meth:`~file.readline` 

503 without arguments. 

504 

505 If you need line-by-line processing it's strongly recommended to iterate 

506 over the input stream using this helper function. 

507 

508 .. deprecated:: 2.3 

509 Will be removed in Werkzeug 3.0. 

510 

511 .. versionadded:: 0.11 

512 added support for the `cap_at_buffer` parameter. 

513 

514 .. versionadded:: 0.9 

515 added support for iterators as input stream. 

516 

517 .. versionchanged:: 0.8 

518 This function now ensures that the limit was reached. 

519 

520 :param stream: the stream or iterate to iterate over. 

521 :param limit: the limit in bytes for the stream. (Usually 

522 content length. Not necessary if the `stream` 

523 is a :class:`LimitedStream`. 

524 :param buffer_size: The optional buffer size. 

525 :param cap_at_buffer: if this is set chunks are split if they are longer 

526 than the buffer size. Internally this is implemented 

527 that the buffer size might be exhausted by a factor 

528 of two however. 

529 """ 

530 warnings.warn( 

531 "'make_line_iter' is deprecated and will be removed in Werkzeug 3.0.", 

532 DeprecationWarning, 

533 stacklevel=2, 

534 ) 

535 _iter = _make_chunk_iter(stream, limit, buffer_size) 

536 

537 first_item = next(_iter, "") 

538 

539 if not first_item: 

540 return 

541 

542 s = _make_encode_wrapper(first_item) 

543 empty = t.cast(bytes, s("")) 

544 cr = t.cast(bytes, s("\r")) 

545 lf = t.cast(bytes, s("\n")) 

546 crlf = t.cast(bytes, s("\r\n")) 

547 

548 _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter)) 

549 

550 def _iter_basic_lines() -> t.Iterator[bytes]: 

551 _join = empty.join 

552 buffer: list[bytes] = [] 

553 while True: 

554 new_data = next(_iter, "") 

555 if not new_data: 

556 break 

557 new_buf: list[bytes] = [] 

558 buf_size = 0 

559 for item in t.cast( 

560 t.Iterator[bytes], chain(buffer, new_data.splitlines(True)) 

561 ): 

562 new_buf.append(item) 

563 buf_size += len(item) 

564 if item and item[-1:] in crlf: 

565 yield _join(new_buf) 

566 new_buf = [] 

567 elif cap_at_buffer and buf_size >= buffer_size: 

568 rv = _join(new_buf) 

569 while len(rv) >= buffer_size: 

570 yield rv[:buffer_size] 

571 rv = rv[buffer_size:] 

572 new_buf = [rv] 

573 buffer = new_buf 

574 if buffer: 

575 yield _join(buffer) 

576 

577 # This hackery is necessary to merge 'foo\r' and '\n' into one item 

578 # of 'foo\r\n' if we were unlucky and we hit a chunk boundary. 

579 previous = empty 

580 for item in _iter_basic_lines(): 

581 if item == lf and previous[-1:] == cr: 

582 previous += item 

583 item = empty 

584 if previous: 

585 yield previous 

586 previous = item 

587 if previous: 

588 yield previous 

589 

590 

591def make_chunk_iter( 

592 stream: t.Iterable[bytes] | t.IO[bytes], 

593 separator: bytes, 

594 limit: int | None = None, 

595 buffer_size: int = 10 * 1024, 

596 cap_at_buffer: bool = False, 

597) -> t.Iterator[bytes]: 

598 """Works like :func:`make_line_iter` but accepts a separator 

599 which divides chunks. If you want newline based processing 

600 you should use :func:`make_line_iter` instead as it 

601 supports arbitrary newline markers. 

602 

603 .. deprecated:: 2.3 

604 Will be removed in Werkzeug 3.0. 

605 

606 .. versionchanged:: 0.11 

607 added support for the `cap_at_buffer` parameter. 

608 

609 .. versionchanged:: 0.9 

610 added support for iterators as input stream. 

611 

612 .. versionadded:: 0.8 

613 

614 :param stream: the stream or iterate to iterate over. 

615 :param separator: the separator that divides chunks. 

616 :param limit: the limit in bytes for the stream. (Usually 

617 content length. Not necessary if the `stream` 

618 is otherwise already limited). 

619 :param buffer_size: The optional buffer size. 

620 :param cap_at_buffer: if this is set chunks are split if they are longer 

621 than the buffer size. Internally this is implemented 

622 that the buffer size might be exhausted by a factor 

623 of two however. 

624 """ 

625 warnings.warn( 

626 "'make_chunk_iter' is deprecated and will be removed in Werkzeug 3.0.", 

627 DeprecationWarning, 

628 stacklevel=2, 

629 ) 

630 _iter = _make_chunk_iter(stream, limit, buffer_size) 

631 

632 first_item = next(_iter, b"") 

633 

634 if not first_item: 

635 return 

636 

637 _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter)) 

638 if isinstance(first_item, str): 

639 separator = _to_str(separator) 

640 _split = re.compile(f"({re.escape(separator)})").split 

641 _join = "".join 

642 else: 

643 separator = _to_bytes(separator) 

644 _split = re.compile(b"(" + re.escape(separator) + b")").split 

645 _join = b"".join 

646 

647 buffer: list[bytes] = [] 

648 while True: 

649 new_data = next(_iter, b"") 

650 if not new_data: 

651 break 

652 chunks = _split(new_data) 

653 new_buf: list[bytes] = [] 

654 buf_size = 0 

655 for item in chain(buffer, chunks): 

656 if item == separator: 

657 yield _join(new_buf) 

658 new_buf = [] 

659 buf_size = 0 

660 else: 

661 buf_size += len(item) 

662 new_buf.append(item) 

663 

664 if cap_at_buffer and buf_size >= buffer_size: 

665 rv = _join(new_buf) 

666 while len(rv) >= buffer_size: 

667 yield rv[:buffer_size] 

668 rv = rv[buffer_size:] 

669 new_buf = [rv] 

670 buf_size = len(rv) 

671 

672 buffer = new_buf 

673 if buffer: 

674 yield _join(buffer) 

675 

676 

677class LimitedStream(io.RawIOBase): 

678 """Wrap a stream so that it doesn't read more than a given limit. This is used to 

679 limit ``wsgi.input`` to the ``Content-Length`` header value or 

680 :attr:`.Request.max_content_length`. 

681 

682 When attempting to read after the limit has been reached, :meth:`on_exhausted` is 

683 called. When the limit is a maximum, this raises :exc:`.RequestEntityTooLarge`. 

684 

685 If reading from the stream returns zero bytes or raises an error, 

686 :meth:`on_disconnect` is called, which raises :exc:`.ClientDisconnected`. When the 

687 limit is a maximum and zero bytes were read, no error is raised, since it may be the 

688 end of the stream. 

689 

690 If the limit is reached before the underlying stream is exhausted (such as a file 

691 that is too large, or an infinite stream), the remaining contents of the stream 

692 cannot be read safely. Depending on how the server handles this, clients may show a 

693 "connection reset" failure instead of seeing the 413 response. 

694 

695 :param stream: The stream to read from. Must be a readable binary IO object. 

696 :param limit: The limit in bytes to not read past. Should be either the 

697 ``Content-Length`` header value or ``request.max_content_length``. 

698 :param is_max: Whether the given ``limit`` is ``request.max_content_length`` instead 

699 of the ``Content-Length`` header value. This changes how exhausted and 

700 disconnect events are handled. 

701 

702 .. versionchanged:: 2.3 

703 Handle ``max_content_length`` differently than ``Content-Length``. 

704 

705 .. versionchanged:: 2.3 

706 Implements ``io.RawIOBase`` rather than ``io.IOBase``. 

707 """ 

708 

709 def __init__(self, stream: t.IO[bytes], limit: int, is_max: bool = False) -> None: 

710 self._stream = stream 

711 self._pos = 0 

712 self.limit = limit 

713 self._limit_is_max = is_max 

714 

715 @property 

716 def is_exhausted(self) -> bool: 

717 """Whether the current stream position has reached the limit.""" 

718 return self._pos >= self.limit 

719 

720 def on_exhausted(self) -> None: 

721 """Called when attempting to read after the limit has been reached. 

722 

723 The default behavior is to do nothing, unless the limit is a maximum, in which 

724 case it raises :exc:`.RequestEntityTooLarge`. 

725 

726 .. versionchanged:: 2.3 

727 Raises ``RequestEntityTooLarge`` if the limit is a maximum. 

728 

729 .. versionchanged:: 2.3 

730 Any return value is ignored. 

731 """ 

732 if self._limit_is_max: 

733 raise RequestEntityTooLarge() 

734 

735 def on_disconnect(self, error: Exception | None = None) -> None: 

736 """Called when an attempted read receives zero bytes before the limit was 

737 reached. This indicates that the client disconnected before sending the full 

738 request body. 

739 

740 The default behavior is to raise :exc:`.ClientDisconnected`, unless the limit is 

741 a maximum and no error was raised. 

742 

743 .. versionchanged:: 2.3 

744 Added the ``error`` parameter. Do nothing if the limit is a maximum and no 

745 error was raised. 

746 

747 .. versionchanged:: 2.3 

748 Any return value is ignored. 

749 """ 

750 if not self._limit_is_max or error is not None: 

751 raise ClientDisconnected() 

752 

753 # If the limit is a maximum, then we may have read zero bytes because the 

754 # streaming body is complete. There's no way to distinguish that from the 

755 # client disconnecting early. 

756 

757 def exhaust(self) -> bytes: 

758 """Exhaust the stream by reading until the limit is reached or the client 

759 disconnects, returning the remaining data. 

760 

761 .. versionchanged:: 2.3 

762 Return the remaining data. 

763 

764 .. versionchanged:: 2.2.3 

765 Handle case where wrapped stream returns fewer bytes than requested. 

766 """ 

767 if not self.is_exhausted: 

768 return self.readall() 

769 

770 return b"" 

771 

772 def readinto(self, b: bytearray) -> int | None: # type: ignore[override] 

773 size = len(b) 

774 remaining = self.limit - self._pos 

775 

776 if remaining <= 0: 

777 self.on_exhausted() 

778 return 0 

779 

780 if hasattr(self._stream, "readinto"): 

781 # Use stream.readinto if it's available. 

782 if size <= remaining: 

783 # The size fits in the remaining limit, use the buffer directly. 

784 try: 

785 out_size: int | None = self._stream.readinto(b) 

786 except (OSError, ValueError) as e: 

787 self.on_disconnect(error=e) 

788 return 0 

789 else: 

790 # Use a temp buffer with the remaining limit as the size. 

791 temp_b = bytearray(remaining) 

792 

793 try: 

794 out_size = self._stream.readinto(temp_b) 

795 except (OSError, ValueError) as e: 

796 self.on_disconnect(error=e) 

797 return 0 

798 

799 if out_size: 

800 b[:out_size] = temp_b 

801 else: 

802 # WSGI requires that stream.read is available. 

803 try: 

804 data = self._stream.read(min(size, remaining)) 

805 except (OSError, ValueError) as e: 

806 self.on_disconnect(error=e) 

807 return 0 

808 

809 out_size = len(data) 

810 b[:out_size] = data 

811 

812 if not out_size: 

813 # Read zero bytes from the stream. 

814 self.on_disconnect() 

815 return 0 

816 

817 self._pos += out_size 

818 return out_size 

819 

820 def readall(self) -> bytes: 

821 if self.is_exhausted: 

822 self.on_exhausted() 

823 return b"" 

824 

825 out = bytearray() 

826 

827 # The parent implementation uses "while True", which results in an extra read. 

828 while not self.is_exhausted: 

829 data = self.read(1024 * 64) 

830 

831 # Stream may return empty before a max limit is reached. 

832 if not data: 

833 break 

834 

835 out.extend(data) 

836 

837 return bytes(out) 

838 

839 def tell(self) -> int: 

840 """Return the current stream position. 

841 

842 .. versionadded:: 0.9 

843 """ 

844 return self._pos 

845 

846 def readable(self) -> bool: 

847 return True