Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/wsgi.py: 35%
337 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-09 06:08 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-09 06:08 +0000
1from __future__ import annotations
3import io
4import re
5import typing as t
6import warnings
7from functools import partial
8from functools import update_wrapper
9from itertools import chain
11from ._internal import _make_encode_wrapper
12from ._internal import _to_bytes
13from ._internal import _to_str
14from .exceptions import ClientDisconnected
15from .exceptions import RequestEntityTooLarge
16from .sansio import utils as _sansio_utils
17from .sansio.utils import host_is_trusted # noqa: F401 # Imported as part of API
19if t.TYPE_CHECKING:
20 from _typeshed.wsgi import WSGIApplication
21 from _typeshed.wsgi import WSGIEnvironment
24def responder(f: t.Callable[..., WSGIApplication]) -> WSGIApplication:
25 """Marks a function as responder. Decorate a function with it and it
26 will automatically call the return value as WSGI application.
28 Example::
30 @responder
31 def application(environ, start_response):
32 return Response('Hello World!')
33 """
34 return update_wrapper(lambda *a: f(*a)(*a[-2:]), f)
37def get_current_url(
38 environ: WSGIEnvironment,
39 root_only: bool = False,
40 strip_querystring: bool = False,
41 host_only: bool = False,
42 trusted_hosts: t.Iterable[str] | None = None,
43) -> str:
44 """Recreate the URL for a request from the parts in a WSGI
45 environment.
47 The URL is an IRI, not a URI, so it may contain Unicode characters.
48 Use :func:`~werkzeug.urls.iri_to_uri` to convert it to ASCII.
50 :param environ: The WSGI environment to get the URL parts from.
51 :param root_only: Only build the root path, don't include the
52 remaining path or query string.
53 :param strip_querystring: Don't include the query string.
54 :param host_only: Only build the scheme and host.
55 :param trusted_hosts: A list of trusted host names to validate the
56 host against.
57 """
58 parts = {
59 "scheme": environ["wsgi.url_scheme"],
60 "host": get_host(environ, trusted_hosts),
61 }
63 if not host_only:
64 parts["root_path"] = environ.get("SCRIPT_NAME", "")
66 if not root_only:
67 parts["path"] = environ.get("PATH_INFO", "")
69 if not strip_querystring:
70 parts["query_string"] = environ.get("QUERY_STRING", "").encode("latin1")
72 return _sansio_utils.get_current_url(**parts)
75def _get_server(
76 environ: WSGIEnvironment,
77) -> tuple[str, int | None] | None:
78 name = environ.get("SERVER_NAME")
80 if name is None:
81 return None
83 try:
84 port: int | None = int(environ.get("SERVER_PORT", None))
85 except (TypeError, ValueError):
86 # unix socket
87 port = None
89 return name, port
92def get_host(
93 environ: WSGIEnvironment, trusted_hosts: t.Iterable[str] | None = None
94) -> str:
95 """Return the host for the given WSGI environment.
97 The ``Host`` header is preferred, then ``SERVER_NAME`` if it's not
98 set. The returned host will only contain the port if it is different
99 than the standard port for the protocol.
101 Optionally, verify that the host is trusted using
102 :func:`host_is_trusted` and raise a
103 :exc:`~werkzeug.exceptions.SecurityError` if it is not.
105 :param environ: A WSGI environment dict.
106 :param trusted_hosts: A list of trusted host names.
108 :return: Host, with port if necessary.
109 :raise ~werkzeug.exceptions.SecurityError: If the host is not
110 trusted.
111 """
112 return _sansio_utils.get_host(
113 environ["wsgi.url_scheme"],
114 environ.get("HTTP_HOST"),
115 _get_server(environ),
116 trusted_hosts,
117 )
120def get_content_length(environ: WSGIEnvironment) -> int | None:
121 """Return the ``Content-Length`` header value as an int. If the header is not given
122 or the ``Transfer-Encoding`` header is ``chunked``, ``None`` is returned to indicate
123 a streaming request. If the value is not an integer, or negative, 0 is returned.
125 :param environ: The WSGI environ to get the content length from.
127 .. versionadded:: 0.9
128 """
129 return _sansio_utils.get_content_length(
130 http_content_length=environ.get("CONTENT_LENGTH"),
131 http_transfer_encoding=environ.get("HTTP_TRANSFER_ENCODING"),
132 )
135def get_input_stream(
136 environ: WSGIEnvironment,
137 safe_fallback: bool = True,
138 max_content_length: int | None = None,
139) -> t.IO[bytes]:
140 """Return the WSGI input stream, wrapped so that it may be read safely without going
141 past the ``Content-Length`` header value or ``max_content_length``.
143 If ``Content-Length`` exceeds ``max_content_length``, a
144 :exc:`RequestEntityTooLarge`` ``413 Content Too Large`` error is raised.
146 If the WSGI server sets ``environ["wsgi.input_terminated"]``, it indicates that the
147 server handles terminating the stream, so it is safe to read directly. For example,
148 a server that knows how to handle chunked requests safely would set this.
150 If ``max_content_length`` is set, it can be enforced on streams if
151 ``wsgi.input_terminated`` is set. Otherwise, an empty stream is returned unless the
152 user explicitly disables this safe fallback.
154 If the limit is reached before the underlying stream is exhausted (such as a file
155 that is too large, or an infinite stream), the remaining contents of the stream
156 cannot be read safely. Depending on how the server handles this, clients may show a
157 "connection reset" failure instead of seeing the 413 response.
159 :param environ: The WSGI environ containing the stream.
160 :param safe_fallback: Return an empty stream when ``Content-Length`` is not set.
161 Disabling this allows infinite streams, which can be a denial-of-service risk.
162 :param max_content_length: The maximum length that content-length or streaming
163 requests may not exceed.
165 .. versionchanged:: 2.3.2
166 ``max_content_length`` is only applied to streaming requests if the server sets
167 ``wsgi.input_terminated``.
169 .. versionchanged:: 2.3
170 Check ``max_content_length`` and raise an error if it is exceeded.
172 .. versionadded:: 0.9
173 """
174 stream = t.cast(t.IO[bytes], environ["wsgi.input"])
175 content_length = get_content_length(environ)
177 if content_length is not None and max_content_length is not None:
178 if content_length > max_content_length:
179 raise RequestEntityTooLarge()
181 # A WSGI server can set this to indicate that it terminates the input stream. In
182 # that case the stream is safe without wrapping, or can enforce a max length.
183 if "wsgi.input_terminated" in environ:
184 if max_content_length is not None:
185 # If this is moved above, it can cause the stream to hang if a read attempt
186 # is made when the client sends no data. For example, the development server
187 # does not handle buffering except for chunked encoding.
188 return t.cast(
189 t.IO[bytes], LimitedStream(stream, max_content_length, is_max=True)
190 )
192 return stream
194 # No limit given, return an empty stream unless the user explicitly allows the
195 # potentially infinite stream. An infinite stream is dangerous if it's not expected,
196 # as it can tie up a worker indefinitely.
197 if content_length is None:
198 return io.BytesIO() if safe_fallback else stream
200 return t.cast(t.IO[bytes], LimitedStream(stream, content_length))
203def get_path_info(
204 environ: WSGIEnvironment,
205 charset: t.Any = ...,
206 errors: str | None = None,
207) -> str:
208 """Return ``PATH_INFO`` from the WSGI environment.
210 :param environ: WSGI environment to get the path from.
212 .. versionchanged:: 2.3
213 The ``charset`` and ``errors`` parameters are deprecated and will be removed in
214 Werkzeug 3.0.
216 .. versionadded:: 0.9
217 """
218 if charset is not ...:
219 warnings.warn(
220 "The 'charset' parameter is deprecated and will be removed"
221 " in Werkzeug 3.0.",
222 DeprecationWarning,
223 stacklevel=2,
224 )
226 if charset is None:
227 charset = "utf-8"
228 else:
229 charset = "utf-8"
231 if errors is not None:
232 warnings.warn(
233 "The 'errors' parameter is deprecated and will be removed in Werkzeug 3.0",
234 DeprecationWarning,
235 stacklevel=2,
236 )
237 else:
238 errors = "replace"
240 path = environ.get("PATH_INFO", "").encode("latin1")
241 return path.decode(charset, errors) # type: ignore[no-any-return]
244class ClosingIterator:
245 """The WSGI specification requires that all middlewares and gateways
246 respect the `close` callback of the iterable returned by the application.
247 Because it is useful to add another close action to a returned iterable
248 and adding a custom iterable is a boring task this class can be used for
249 that::
251 return ClosingIterator(app(environ, start_response), [cleanup_session,
252 cleanup_locals])
254 If there is just one close function it can be passed instead of the list.
256 A closing iterator is not needed if the application uses response objects
257 and finishes the processing if the response is started::
259 try:
260 return response(environ, start_response)
261 finally:
262 cleanup_session()
263 cleanup_locals()
264 """
266 def __init__(
267 self,
268 iterable: t.Iterable[bytes],
269 callbacks: None
270 | (t.Callable[[], None] | t.Iterable[t.Callable[[], None]]) = None,
271 ) -> None:
272 iterator = iter(iterable)
273 self._next = t.cast(t.Callable[[], bytes], partial(next, iterator))
274 if callbacks is None:
275 callbacks = []
276 elif callable(callbacks):
277 callbacks = [callbacks]
278 else:
279 callbacks = list(callbacks)
280 iterable_close = getattr(iterable, "close", None)
281 if iterable_close:
282 callbacks.insert(0, iterable_close)
283 self._callbacks = callbacks
285 def __iter__(self) -> ClosingIterator:
286 return self
288 def __next__(self) -> bytes:
289 return self._next()
291 def close(self) -> None:
292 for callback in self._callbacks:
293 callback()
296def wrap_file(
297 environ: WSGIEnvironment, file: t.IO[bytes], buffer_size: int = 8192
298) -> t.Iterable[bytes]:
299 """Wraps a file. This uses the WSGI server's file wrapper if available
300 or otherwise the generic :class:`FileWrapper`.
302 .. versionadded:: 0.5
304 If the file wrapper from the WSGI server is used it's important to not
305 iterate over it from inside the application but to pass it through
306 unchanged. If you want to pass out a file wrapper inside a response
307 object you have to set :attr:`Response.direct_passthrough` to `True`.
309 More information about file wrappers are available in :pep:`333`.
311 :param file: a :class:`file`-like object with a :meth:`~file.read` method.
312 :param buffer_size: number of bytes for one iteration.
313 """
314 return environ.get("wsgi.file_wrapper", FileWrapper)( # type: ignore
315 file, buffer_size
316 )
319class FileWrapper:
320 """This class can be used to convert a :class:`file`-like object into
321 an iterable. It yields `buffer_size` blocks until the file is fully
322 read.
324 You should not use this class directly but rather use the
325 :func:`wrap_file` function that uses the WSGI server's file wrapper
326 support if it's available.
328 .. versionadded:: 0.5
330 If you're using this object together with a :class:`Response` you have
331 to use the `direct_passthrough` mode.
333 :param file: a :class:`file`-like object with a :meth:`~file.read` method.
334 :param buffer_size: number of bytes for one iteration.
335 """
337 def __init__(self, file: t.IO[bytes], buffer_size: int = 8192) -> None:
338 self.file = file
339 self.buffer_size = buffer_size
341 def close(self) -> None:
342 if hasattr(self.file, "close"):
343 self.file.close()
345 def seekable(self) -> bool:
346 if hasattr(self.file, "seekable"):
347 return self.file.seekable()
348 if hasattr(self.file, "seek"):
349 return True
350 return False
352 def seek(self, *args: t.Any) -> None:
353 if hasattr(self.file, "seek"):
354 self.file.seek(*args)
356 def tell(self) -> int | None:
357 if hasattr(self.file, "tell"):
358 return self.file.tell()
359 return None
361 def __iter__(self) -> FileWrapper:
362 return self
364 def __next__(self) -> bytes:
365 data = self.file.read(self.buffer_size)
366 if data:
367 return data
368 raise StopIteration()
371class _RangeWrapper:
372 # private for now, but should we make it public in the future ?
374 """This class can be used to convert an iterable object into
375 an iterable that will only yield a piece of the underlying content.
376 It yields blocks until the underlying stream range is fully read.
377 The yielded blocks will have a size that can't exceed the original
378 iterator defined block size, but that can be smaller.
380 If you're using this object together with a :class:`Response` you have
381 to use the `direct_passthrough` mode.
383 :param iterable: an iterable object with a :meth:`__next__` method.
384 :param start_byte: byte from which read will start.
385 :param byte_range: how many bytes to read.
386 """
388 def __init__(
389 self,
390 iterable: t.Iterable[bytes] | t.IO[bytes],
391 start_byte: int = 0,
392 byte_range: int | None = None,
393 ):
394 self.iterable = iter(iterable)
395 self.byte_range = byte_range
396 self.start_byte = start_byte
397 self.end_byte = None
399 if byte_range is not None:
400 self.end_byte = start_byte + byte_range
402 self.read_length = 0
403 self.seekable = hasattr(iterable, "seekable") and iterable.seekable()
404 self.end_reached = False
406 def __iter__(self) -> _RangeWrapper:
407 return self
409 def _next_chunk(self) -> bytes:
410 try:
411 chunk = next(self.iterable)
412 self.read_length += len(chunk)
413 return chunk
414 except StopIteration:
415 self.end_reached = True
416 raise
418 def _first_iteration(self) -> tuple[bytes | None, int]:
419 chunk = None
420 if self.seekable:
421 self.iterable.seek(self.start_byte) # type: ignore
422 self.read_length = self.iterable.tell() # type: ignore
423 contextual_read_length = self.read_length
424 else:
425 while self.read_length <= self.start_byte:
426 chunk = self._next_chunk()
427 if chunk is not None:
428 chunk = chunk[self.start_byte - self.read_length :]
429 contextual_read_length = self.start_byte
430 return chunk, contextual_read_length
432 def _next(self) -> bytes:
433 if self.end_reached:
434 raise StopIteration()
435 chunk = None
436 contextual_read_length = self.read_length
437 if self.read_length == 0:
438 chunk, contextual_read_length = self._first_iteration()
439 if chunk is None:
440 chunk = self._next_chunk()
441 if self.end_byte is not None and self.read_length >= self.end_byte:
442 self.end_reached = True
443 return chunk[: self.end_byte - contextual_read_length]
444 return chunk
446 def __next__(self) -> bytes:
447 chunk = self._next()
448 if chunk:
449 return chunk
450 self.end_reached = True
451 raise StopIteration()
453 def close(self) -> None:
454 if hasattr(self.iterable, "close"):
455 self.iterable.close()
458def _make_chunk_iter(
459 stream: t.Iterable[bytes] | t.IO[bytes],
460 limit: int | None,
461 buffer_size: int,
462) -> t.Iterator[bytes]:
463 """Helper for the line and chunk iter functions."""
464 warnings.warn(
465 "'_make_chunk_iter' is deprecated and will be removed in Werkzeug 3.0.",
466 DeprecationWarning,
467 stacklevel=2,
468 )
470 if isinstance(stream, (bytes, bytearray, str)):
471 raise TypeError(
472 "Passed a string or byte object instead of true iterator or stream."
473 )
474 if not hasattr(stream, "read"):
475 for item in stream:
476 if item:
477 yield item
478 return
479 stream = t.cast(t.IO[bytes], stream)
480 if not isinstance(stream, LimitedStream) and limit is not None:
481 stream = t.cast(t.IO[bytes], LimitedStream(stream, limit))
482 _read = stream.read
483 while True:
484 item = _read(buffer_size)
485 if not item:
486 break
487 yield item
490def make_line_iter(
491 stream: t.Iterable[bytes] | t.IO[bytes],
492 limit: int | None = None,
493 buffer_size: int = 10 * 1024,
494 cap_at_buffer: bool = False,
495) -> t.Iterator[bytes]:
496 """Safely iterates line-based over an input stream. If the input stream
497 is not a :class:`LimitedStream` the `limit` parameter is mandatory.
499 This uses the stream's :meth:`~file.read` method internally as opposite
500 to the :meth:`~file.readline` method that is unsafe and can only be used
501 in violation of the WSGI specification. The same problem applies to the
502 `__iter__` function of the input stream which calls :meth:`~file.readline`
503 without arguments.
505 If you need line-by-line processing it's strongly recommended to iterate
506 over the input stream using this helper function.
508 .. deprecated:: 2.3
509 Will be removed in Werkzeug 3.0.
511 .. versionadded:: 0.11
512 added support for the `cap_at_buffer` parameter.
514 .. versionadded:: 0.9
515 added support for iterators as input stream.
517 .. versionchanged:: 0.8
518 This function now ensures that the limit was reached.
520 :param stream: the stream or iterate to iterate over.
521 :param limit: the limit in bytes for the stream. (Usually
522 content length. Not necessary if the `stream`
523 is a :class:`LimitedStream`.
524 :param buffer_size: The optional buffer size.
525 :param cap_at_buffer: if this is set chunks are split if they are longer
526 than the buffer size. Internally this is implemented
527 that the buffer size might be exhausted by a factor
528 of two however.
529 """
530 warnings.warn(
531 "'make_line_iter' is deprecated and will be removed in Werkzeug 3.0.",
532 DeprecationWarning,
533 stacklevel=2,
534 )
535 _iter = _make_chunk_iter(stream, limit, buffer_size)
537 first_item = next(_iter, "")
539 if not first_item:
540 return
542 s = _make_encode_wrapper(first_item)
543 empty = t.cast(bytes, s(""))
544 cr = t.cast(bytes, s("\r"))
545 lf = t.cast(bytes, s("\n"))
546 crlf = t.cast(bytes, s("\r\n"))
548 _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter))
550 def _iter_basic_lines() -> t.Iterator[bytes]:
551 _join = empty.join
552 buffer: list[bytes] = []
553 while True:
554 new_data = next(_iter, "")
555 if not new_data:
556 break
557 new_buf: list[bytes] = []
558 buf_size = 0
559 for item in t.cast(
560 t.Iterator[bytes], chain(buffer, new_data.splitlines(True))
561 ):
562 new_buf.append(item)
563 buf_size += len(item)
564 if item and item[-1:] in crlf:
565 yield _join(new_buf)
566 new_buf = []
567 elif cap_at_buffer and buf_size >= buffer_size:
568 rv = _join(new_buf)
569 while len(rv) >= buffer_size:
570 yield rv[:buffer_size]
571 rv = rv[buffer_size:]
572 new_buf = [rv]
573 buffer = new_buf
574 if buffer:
575 yield _join(buffer)
577 # This hackery is necessary to merge 'foo\r' and '\n' into one item
578 # of 'foo\r\n' if we were unlucky and we hit a chunk boundary.
579 previous = empty
580 for item in _iter_basic_lines():
581 if item == lf and previous[-1:] == cr:
582 previous += item
583 item = empty
584 if previous:
585 yield previous
586 previous = item
587 if previous:
588 yield previous
591def make_chunk_iter(
592 stream: t.Iterable[bytes] | t.IO[bytes],
593 separator: bytes,
594 limit: int | None = None,
595 buffer_size: int = 10 * 1024,
596 cap_at_buffer: bool = False,
597) -> t.Iterator[bytes]:
598 """Works like :func:`make_line_iter` but accepts a separator
599 which divides chunks. If you want newline based processing
600 you should use :func:`make_line_iter` instead as it
601 supports arbitrary newline markers.
603 .. deprecated:: 2.3
604 Will be removed in Werkzeug 3.0.
606 .. versionchanged:: 0.11
607 added support for the `cap_at_buffer` parameter.
609 .. versionchanged:: 0.9
610 added support for iterators as input stream.
612 .. versionadded:: 0.8
614 :param stream: the stream or iterate to iterate over.
615 :param separator: the separator that divides chunks.
616 :param limit: the limit in bytes for the stream. (Usually
617 content length. Not necessary if the `stream`
618 is otherwise already limited).
619 :param buffer_size: The optional buffer size.
620 :param cap_at_buffer: if this is set chunks are split if they are longer
621 than the buffer size. Internally this is implemented
622 that the buffer size might be exhausted by a factor
623 of two however.
624 """
625 warnings.warn(
626 "'make_chunk_iter' is deprecated and will be removed in Werkzeug 3.0.",
627 DeprecationWarning,
628 stacklevel=2,
629 )
630 _iter = _make_chunk_iter(stream, limit, buffer_size)
632 first_item = next(_iter, b"")
634 if not first_item:
635 return
637 _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter))
638 if isinstance(first_item, str):
639 separator = _to_str(separator)
640 _split = re.compile(f"({re.escape(separator)})").split
641 _join = "".join
642 else:
643 separator = _to_bytes(separator)
644 _split = re.compile(b"(" + re.escape(separator) + b")").split
645 _join = b"".join
647 buffer: list[bytes] = []
648 while True:
649 new_data = next(_iter, b"")
650 if not new_data:
651 break
652 chunks = _split(new_data)
653 new_buf: list[bytes] = []
654 buf_size = 0
655 for item in chain(buffer, chunks):
656 if item == separator:
657 yield _join(new_buf)
658 new_buf = []
659 buf_size = 0
660 else:
661 buf_size += len(item)
662 new_buf.append(item)
664 if cap_at_buffer and buf_size >= buffer_size:
665 rv = _join(new_buf)
666 while len(rv) >= buffer_size:
667 yield rv[:buffer_size]
668 rv = rv[buffer_size:]
669 new_buf = [rv]
670 buf_size = len(rv)
672 buffer = new_buf
673 if buffer:
674 yield _join(buffer)
677class LimitedStream(io.RawIOBase):
678 """Wrap a stream so that it doesn't read more than a given limit. This is used to
679 limit ``wsgi.input`` to the ``Content-Length`` header value or
680 :attr:`.Request.max_content_length`.
682 When attempting to read after the limit has been reached, :meth:`on_exhausted` is
683 called. When the limit is a maximum, this raises :exc:`.RequestEntityTooLarge`.
685 If reading from the stream returns zero bytes or raises an error,
686 :meth:`on_disconnect` is called, which raises :exc:`.ClientDisconnected`. When the
687 limit is a maximum and zero bytes were read, no error is raised, since it may be the
688 end of the stream.
690 If the limit is reached before the underlying stream is exhausted (such as a file
691 that is too large, or an infinite stream), the remaining contents of the stream
692 cannot be read safely. Depending on how the server handles this, clients may show a
693 "connection reset" failure instead of seeing the 413 response.
695 :param stream: The stream to read from. Must be a readable binary IO object.
696 :param limit: The limit in bytes to not read past. Should be either the
697 ``Content-Length`` header value or ``request.max_content_length``.
698 :param is_max: Whether the given ``limit`` is ``request.max_content_length`` instead
699 of the ``Content-Length`` header value. This changes how exhausted and
700 disconnect events are handled.
702 .. versionchanged:: 2.3
703 Handle ``max_content_length`` differently than ``Content-Length``.
705 .. versionchanged:: 2.3
706 Implements ``io.RawIOBase`` rather than ``io.IOBase``.
707 """
709 def __init__(self, stream: t.IO[bytes], limit: int, is_max: bool = False) -> None:
710 self._stream = stream
711 self._pos = 0
712 self.limit = limit
713 self._limit_is_max = is_max
715 @property
716 def is_exhausted(self) -> bool:
717 """Whether the current stream position has reached the limit."""
718 return self._pos >= self.limit
720 def on_exhausted(self) -> None:
721 """Called when attempting to read after the limit has been reached.
723 The default behavior is to do nothing, unless the limit is a maximum, in which
724 case it raises :exc:`.RequestEntityTooLarge`.
726 .. versionchanged:: 2.3
727 Raises ``RequestEntityTooLarge`` if the limit is a maximum.
729 .. versionchanged:: 2.3
730 Any return value is ignored.
731 """
732 if self._limit_is_max:
733 raise RequestEntityTooLarge()
735 def on_disconnect(self, error: Exception | None = None) -> None:
736 """Called when an attempted read receives zero bytes before the limit was
737 reached. This indicates that the client disconnected before sending the full
738 request body.
740 The default behavior is to raise :exc:`.ClientDisconnected`, unless the limit is
741 a maximum and no error was raised.
743 .. versionchanged:: 2.3
744 Added the ``error`` parameter. Do nothing if the limit is a maximum and no
745 error was raised.
747 .. versionchanged:: 2.3
748 Any return value is ignored.
749 """
750 if not self._limit_is_max or error is not None:
751 raise ClientDisconnected()
753 # If the limit is a maximum, then we may have read zero bytes because the
754 # streaming body is complete. There's no way to distinguish that from the
755 # client disconnecting early.
757 def exhaust(self) -> bytes:
758 """Exhaust the stream by reading until the limit is reached or the client
759 disconnects, returning the remaining data.
761 .. versionchanged:: 2.3
762 Return the remaining data.
764 .. versionchanged:: 2.2.3
765 Handle case where wrapped stream returns fewer bytes than requested.
766 """
767 if not self.is_exhausted:
768 return self.readall()
770 return b""
772 def readinto(self, b: bytearray) -> int | None: # type: ignore[override]
773 size = len(b)
774 remaining = self.limit - self._pos
776 if remaining <= 0:
777 self.on_exhausted()
778 return 0
780 if hasattr(self._stream, "readinto"):
781 # Use stream.readinto if it's available.
782 if size <= remaining:
783 # The size fits in the remaining limit, use the buffer directly.
784 try:
785 out_size: int | None = self._stream.readinto(b)
786 except (OSError, ValueError) as e:
787 self.on_disconnect(error=e)
788 return 0
789 else:
790 # Use a temp buffer with the remaining limit as the size.
791 temp_b = bytearray(remaining)
793 try:
794 out_size = self._stream.readinto(temp_b)
795 except (OSError, ValueError) as e:
796 self.on_disconnect(error=e)
797 return 0
799 if out_size:
800 b[:out_size] = temp_b
801 else:
802 # WSGI requires that stream.read is available.
803 try:
804 data = self._stream.read(min(size, remaining))
805 except (OSError, ValueError) as e:
806 self.on_disconnect(error=e)
807 return 0
809 out_size = len(data)
810 b[:out_size] = data
812 if not out_size:
813 # Read zero bytes from the stream.
814 self.on_disconnect()
815 return 0
817 self._pos += out_size
818 return out_size
820 def readall(self) -> bytes:
821 if self.is_exhausted:
822 self.on_exhausted()
823 return b""
825 out = bytearray()
827 # The parent implementation uses "while True", which results in an extra read.
828 while not self.is_exhausted:
829 data = self.read(1024 * 64)
831 # Stream may return empty before a max limit is reached.
832 if not data:
833 break
835 out.extend(data)
837 return bytes(out)
839 def tell(self) -> int:
840 """Return the current stream position.
842 .. versionadded:: 0.9
843 """
844 return self._pos
846 def readable(self) -> bool:
847 return True