Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/wsgi.py: 17%
401 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1import io
2import re
3import typing as t
4import warnings
5from functools import partial
6from functools import update_wrapper
7from itertools import chain
9from ._internal import _make_encode_wrapper
10from ._internal import _to_bytes
11from ._internal import _to_str
12from .sansio import utils as _sansio_utils
13from .sansio.utils import host_is_trusted # noqa: F401 # Imported as part of API
14from .urls import _URLTuple
15from .urls import uri_to_iri
16from .urls import url_join
17from .urls import url_parse
18from .urls import url_quote
20if t.TYPE_CHECKING:
21 from _typeshed.wsgi import WSGIApplication
22 from _typeshed.wsgi import WSGIEnvironment
25def responder(f: t.Callable[..., "WSGIApplication"]) -> "WSGIApplication":
26 """Marks a function as responder. Decorate a function with it and it
27 will automatically call the return value as WSGI application.
29 Example::
31 @responder
32 def application(environ, start_response):
33 return Response('Hello World!')
34 """
35 return update_wrapper(lambda *a: f(*a)(*a[-2:]), f)
38def get_current_url(
39 environ: "WSGIEnvironment",
40 root_only: bool = False,
41 strip_querystring: bool = False,
42 host_only: bool = False,
43 trusted_hosts: t.Optional[t.Iterable[str]] = None,
44) -> str:
45 """Recreate the URL for a request from the parts in a WSGI
46 environment.
48 The URL is an IRI, not a URI, so it may contain Unicode characters.
49 Use :func:`~werkzeug.urls.iri_to_uri` to convert it to ASCII.
51 :param environ: The WSGI environment to get the URL parts from.
52 :param root_only: Only build the root path, don't include the
53 remaining path or query string.
54 :param strip_querystring: Don't include the query string.
55 :param host_only: Only build the scheme and host.
56 :param trusted_hosts: A list of trusted host names to validate the
57 host against.
58 """
59 parts = {
60 "scheme": environ["wsgi.url_scheme"],
61 "host": get_host(environ, trusted_hosts),
62 }
64 if not host_only:
65 parts["root_path"] = environ.get("SCRIPT_NAME", "")
67 if not root_only:
68 parts["path"] = environ.get("PATH_INFO", "")
70 if not strip_querystring:
71 parts["query_string"] = environ.get("QUERY_STRING", "").encode("latin1")
73 return _sansio_utils.get_current_url(**parts)
76def _get_server(
77 environ: "WSGIEnvironment",
78) -> t.Optional[t.Tuple[str, t.Optional[int]]]:
79 name = environ.get("SERVER_NAME")
81 if name is None:
82 return None
84 try:
85 port: t.Optional[int] = int(environ.get("SERVER_PORT", None))
86 except (TypeError, ValueError):
87 # unix socket
88 port = None
90 return name, port
93def get_host(
94 environ: "WSGIEnvironment", trusted_hosts: t.Optional[t.Iterable[str]] = None
95) -> str:
96 """Return the host for the given WSGI environment.
98 The ``Host`` header is preferred, then ``SERVER_NAME`` if it's not
99 set. The returned host will only contain the port if it is different
100 than the standard port for the protocol.
102 Optionally, verify that the host is trusted using
103 :func:`host_is_trusted` and raise a
104 :exc:`~werkzeug.exceptions.SecurityError` if it is not.
106 :param environ: A WSGI environment dict.
107 :param trusted_hosts: A list of trusted host names.
109 :return: Host, with port if necessary.
110 :raise ~werkzeug.exceptions.SecurityError: If the host is not
111 trusted.
112 """
113 return _sansio_utils.get_host(
114 environ["wsgi.url_scheme"],
115 environ.get("HTTP_HOST"),
116 _get_server(environ),
117 trusted_hosts,
118 )
121def get_content_length(environ: "WSGIEnvironment") -> t.Optional[int]:
122 """Returns the content length from the WSGI environment as
123 integer. If it's not available or chunked transfer encoding is used,
124 ``None`` is returned.
126 .. versionadded:: 0.9
128 :param environ: the WSGI environ to fetch the content length from.
129 """
130 return _sansio_utils.get_content_length(
131 http_content_length=environ.get("CONTENT_LENGTH"),
132 http_transfer_encoding=environ.get("HTTP_TRANSFER_ENCODING", ""),
133 )
136def get_input_stream(
137 environ: "WSGIEnvironment", safe_fallback: bool = True
138) -> t.IO[bytes]:
139 """Returns the input stream from the WSGI environment and wraps it
140 in the most sensible way possible. The stream returned is not the
141 raw WSGI stream in most cases but one that is safe to read from
142 without taking into account the content length.
144 If content length is not set, the stream will be empty for safety reasons.
145 If the WSGI server supports chunked or infinite streams, it should set
146 the ``wsgi.input_terminated`` value in the WSGI environ to indicate that.
148 .. versionadded:: 0.9
150 :param environ: the WSGI environ to fetch the stream from.
151 :param safe_fallback: use an empty stream as a safe fallback when the
152 content length is not set. Disabling this allows infinite streams,
153 which can be a denial-of-service risk.
154 """
155 stream = t.cast(t.IO[bytes], environ["wsgi.input"])
156 content_length = get_content_length(environ)
158 # A wsgi extension that tells us if the input is terminated. In
159 # that case we return the stream unchanged as we know we can safely
160 # read it until the end.
161 if environ.get("wsgi.input_terminated"):
162 return stream
164 # If the request doesn't specify a content length, returning the stream is
165 # potentially dangerous because it could be infinite, malicious or not. If
166 # safe_fallback is true, return an empty stream instead for safety.
167 if content_length is None:
168 return io.BytesIO() if safe_fallback else stream
170 # Otherwise limit the stream to the content length
171 return t.cast(t.IO[bytes], LimitedStream(stream, content_length))
174def get_query_string(environ: "WSGIEnvironment") -> str:
175 """Returns the ``QUERY_STRING`` from the WSGI environment. This also
176 takes care of the WSGI decoding dance. The string returned will be
177 restricted to ASCII characters.
179 :param environ: WSGI environment to get the query string from.
181 .. deprecated:: 2.2
182 Will be removed in Werkzeug 2.3.
184 .. versionadded:: 0.9
185 """
186 warnings.warn(
187 "'get_query_string' is deprecated and will be removed in Werkzeug 2.3.",
188 DeprecationWarning,
189 stacklevel=2,
190 )
191 qs = environ.get("QUERY_STRING", "").encode("latin1")
192 # QUERY_STRING really should be ascii safe but some browsers
193 # will send us some unicode stuff (I am looking at you IE).
194 # In that case we want to urllib quote it badly.
195 return url_quote(qs, safe=":&%=+$!*'(),")
198def get_path_info(
199 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace"
200) -> str:
201 """Return the ``PATH_INFO`` from the WSGI environment and decode it
202 unless ``charset`` is ``None``.
204 :param environ: WSGI environment to get the path from.
205 :param charset: The charset for the path info, or ``None`` if no
206 decoding should be performed.
207 :param errors: The decoding error handling.
209 .. versionadded:: 0.9
210 """
211 path = environ.get("PATH_INFO", "").encode("latin1")
212 return _to_str(path, charset, errors, allow_none_charset=True) # type: ignore
215def get_script_name(
216 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace"
217) -> str:
218 """Return the ``SCRIPT_NAME`` from the WSGI environment and decode
219 it unless `charset` is set to ``None``.
221 :param environ: WSGI environment to get the path from.
222 :param charset: The charset for the path, or ``None`` if no decoding
223 should be performed.
224 :param errors: The decoding error handling.
226 .. deprecated:: 2.2
227 Will be removed in Werkzeug 2.3.
229 .. versionadded:: 0.9
230 """
231 warnings.warn(
232 "'get_script_name' is deprecated and will be removed in Werkzeug 2.3.",
233 DeprecationWarning,
234 stacklevel=2,
235 )
236 path = environ.get("SCRIPT_NAME", "").encode("latin1")
237 return _to_str(path, charset, errors, allow_none_charset=True) # type: ignore
240def pop_path_info(
241 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace"
242) -> t.Optional[str]:
243 """Removes and returns the next segment of `PATH_INFO`, pushing it onto
244 `SCRIPT_NAME`. Returns `None` if there is nothing left on `PATH_INFO`.
246 If the `charset` is set to `None` bytes are returned.
248 If there are empty segments (``'/foo//bar``) these are ignored but
249 properly pushed to the `SCRIPT_NAME`:
251 >>> env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b'}
252 >>> pop_path_info(env)
253 'a'
254 >>> env['SCRIPT_NAME']
255 '/foo/a'
256 >>> pop_path_info(env)
257 'b'
258 >>> env['SCRIPT_NAME']
259 '/foo/a/b'
261 .. deprecated:: 2.2
262 Will be removed in Werkzeug 2.3.
264 .. versionadded:: 0.5
266 .. versionchanged:: 0.9
267 The path is now decoded and a charset and encoding
268 parameter can be provided.
270 :param environ: the WSGI environment that is modified.
271 :param charset: The ``encoding`` parameter passed to
272 :func:`bytes.decode`.
273 :param errors: The ``errors`` paramater passed to
274 :func:`bytes.decode`.
275 """
276 warnings.warn(
277 "'pop_path_info' is deprecated and will be removed in Werkzeug 2.3.",
278 DeprecationWarning,
279 stacklevel=2,
280 )
282 path = environ.get("PATH_INFO")
283 if not path:
284 return None
286 script_name = environ.get("SCRIPT_NAME", "")
288 # shift multiple leading slashes over
289 old_path = path
290 path = path.lstrip("/")
291 if path != old_path:
292 script_name += "/" * (len(old_path) - len(path))
294 if "/" not in path:
295 environ["PATH_INFO"] = ""
296 environ["SCRIPT_NAME"] = script_name + path
297 rv = path.encode("latin1")
298 else:
299 segment, path = path.split("/", 1)
300 environ["PATH_INFO"] = f"/{path}"
301 environ["SCRIPT_NAME"] = script_name + segment
302 rv = segment.encode("latin1")
304 return _to_str(rv, charset, errors, allow_none_charset=True) # type: ignore
307def peek_path_info(
308 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace"
309) -> t.Optional[str]:
310 """Returns the next segment on the `PATH_INFO` or `None` if there
311 is none. Works like :func:`pop_path_info` without modifying the
312 environment:
314 >>> env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b'}
315 >>> peek_path_info(env)
316 'a'
317 >>> peek_path_info(env)
318 'a'
320 If the `charset` is set to `None` bytes are returned.
322 .. deprecated:: 2.2
323 Will be removed in Werkzeug 2.3.
325 .. versionadded:: 0.5
327 .. versionchanged:: 0.9
328 The path is now decoded and a charset and encoding
329 parameter can be provided.
331 :param environ: the WSGI environment that is checked.
332 """
333 warnings.warn(
334 "'peek_path_info' is deprecated and will be removed in Werkzeug 2.3.",
335 DeprecationWarning,
336 stacklevel=2,
337 )
339 segments = environ.get("PATH_INFO", "").lstrip("/").split("/", 1)
340 if segments:
341 return _to_str( # type: ignore
342 segments[0].encode("latin1"), charset, errors, allow_none_charset=True
343 )
344 return None
347def extract_path_info(
348 environ_or_baseurl: t.Union[str, "WSGIEnvironment"],
349 path_or_url: t.Union[str, _URLTuple],
350 charset: str = "utf-8",
351 errors: str = "werkzeug.url_quote",
352 collapse_http_schemes: bool = True,
353) -> t.Optional[str]:
354 """Extracts the path info from the given URL (or WSGI environment) and
355 path. The path info returned is a string. The URLs might also be IRIs.
357 If the path info could not be determined, `None` is returned.
359 Some examples:
361 >>> extract_path_info('http://example.com/app', '/app/hello')
362 '/hello'
363 >>> extract_path_info('http://example.com/app',
364 ... 'https://example.com/app/hello')
365 '/hello'
366 >>> extract_path_info('http://example.com/app',
367 ... 'https://example.com/app/hello',
368 ... collapse_http_schemes=False) is None
369 True
371 Instead of providing a base URL you can also pass a WSGI environment.
373 :param environ_or_baseurl: a WSGI environment dict, a base URL or
374 base IRI. This is the root of the
375 application.
376 :param path_or_url: an absolute path from the server root, a
377 relative path (in which case it's the path info)
378 or a full URL.
379 :param charset: the charset for byte data in URLs
380 :param errors: the error handling on decode
381 :param collapse_http_schemes: if set to `False` the algorithm does
382 not assume that http and https on the
383 same server point to the same
384 resource.
386 .. deprecated:: 2.2
387 Will be removed in Werkzeug 2.3.
389 .. versionchanged:: 0.15
390 The ``errors`` parameter defaults to leaving invalid bytes
391 quoted instead of replacing them.
393 .. versionadded:: 0.6
395 """
396 warnings.warn(
397 "'extract_path_info' is deprecated and will be removed in Werkzeug 2.3.",
398 DeprecationWarning,
399 stacklevel=2,
400 )
402 def _normalize_netloc(scheme: str, netloc: str) -> str:
403 parts = netloc.split("@", 1)[-1].split(":", 1)
404 port: t.Optional[str]
406 if len(parts) == 2:
407 netloc, port = parts
408 if (scheme == "http" and port == "80") or (
409 scheme == "https" and port == "443"
410 ):
411 port = None
412 else:
413 netloc = parts[0]
414 port = None
416 if port is not None:
417 netloc += f":{port}"
419 return netloc
421 # make sure whatever we are working on is a IRI and parse it
422 path = uri_to_iri(path_or_url, charset, errors)
423 if isinstance(environ_or_baseurl, dict):
424 environ_or_baseurl = get_current_url(environ_or_baseurl, root_only=True)
425 base_iri = uri_to_iri(environ_or_baseurl, charset, errors)
426 base_scheme, base_netloc, base_path = url_parse(base_iri)[:3]
427 cur_scheme, cur_netloc, cur_path = url_parse(url_join(base_iri, path))[:3]
429 # normalize the network location
430 base_netloc = _normalize_netloc(base_scheme, base_netloc)
431 cur_netloc = _normalize_netloc(cur_scheme, cur_netloc)
433 # is that IRI even on a known HTTP scheme?
434 if collapse_http_schemes:
435 for scheme in base_scheme, cur_scheme:
436 if scheme not in ("http", "https"):
437 return None
438 else:
439 if not (base_scheme in ("http", "https") and base_scheme == cur_scheme):
440 return None
442 # are the netlocs compatible?
443 if base_netloc != cur_netloc:
444 return None
446 # are we below the application path?
447 base_path = base_path.rstrip("/")
448 if not cur_path.startswith(base_path):
449 return None
451 return f"/{cur_path[len(base_path) :].lstrip('/')}"
454class ClosingIterator:
455 """The WSGI specification requires that all middlewares and gateways
456 respect the `close` callback of the iterable returned by the application.
457 Because it is useful to add another close action to a returned iterable
458 and adding a custom iterable is a boring task this class can be used for
459 that::
461 return ClosingIterator(app(environ, start_response), [cleanup_session,
462 cleanup_locals])
464 If there is just one close function it can be passed instead of the list.
466 A closing iterator is not needed if the application uses response objects
467 and finishes the processing if the response is started::
469 try:
470 return response(environ, start_response)
471 finally:
472 cleanup_session()
473 cleanup_locals()
474 """
476 def __init__(
477 self,
478 iterable: t.Iterable[bytes],
479 callbacks: t.Optional[
480 t.Union[t.Callable[[], None], t.Iterable[t.Callable[[], None]]]
481 ] = None,
482 ) -> None:
483 iterator = iter(iterable)
484 self._next = t.cast(t.Callable[[], bytes], partial(next, iterator))
485 if callbacks is None:
486 callbacks = []
487 elif callable(callbacks):
488 callbacks = [callbacks]
489 else:
490 callbacks = list(callbacks)
491 iterable_close = getattr(iterable, "close", None)
492 if iterable_close:
493 callbacks.insert(0, iterable_close)
494 self._callbacks = callbacks
496 def __iter__(self) -> "ClosingIterator":
497 return self
499 def __next__(self) -> bytes:
500 return self._next()
502 def close(self) -> None:
503 for callback in self._callbacks:
504 callback()
507def wrap_file(
508 environ: "WSGIEnvironment", file: t.IO[bytes], buffer_size: int = 8192
509) -> t.Iterable[bytes]:
510 """Wraps a file. This uses the WSGI server's file wrapper if available
511 or otherwise the generic :class:`FileWrapper`.
513 .. versionadded:: 0.5
515 If the file wrapper from the WSGI server is used it's important to not
516 iterate over it from inside the application but to pass it through
517 unchanged. If you want to pass out a file wrapper inside a response
518 object you have to set :attr:`Response.direct_passthrough` to `True`.
520 More information about file wrappers are available in :pep:`333`.
522 :param file: a :class:`file`-like object with a :meth:`~file.read` method.
523 :param buffer_size: number of bytes for one iteration.
524 """
525 return environ.get("wsgi.file_wrapper", FileWrapper)( # type: ignore
526 file, buffer_size
527 )
530class FileWrapper:
531 """This class can be used to convert a :class:`file`-like object into
532 an iterable. It yields `buffer_size` blocks until the file is fully
533 read.
535 You should not use this class directly but rather use the
536 :func:`wrap_file` function that uses the WSGI server's file wrapper
537 support if it's available.
539 .. versionadded:: 0.5
541 If you're using this object together with a :class:`Response` you have
542 to use the `direct_passthrough` mode.
544 :param file: a :class:`file`-like object with a :meth:`~file.read` method.
545 :param buffer_size: number of bytes for one iteration.
546 """
548 def __init__(self, file: t.IO[bytes], buffer_size: int = 8192) -> None:
549 self.file = file
550 self.buffer_size = buffer_size
552 def close(self) -> None:
553 if hasattr(self.file, "close"):
554 self.file.close()
556 def seekable(self) -> bool:
557 if hasattr(self.file, "seekable"):
558 return self.file.seekable()
559 if hasattr(self.file, "seek"):
560 return True
561 return False
563 def seek(self, *args: t.Any) -> None:
564 if hasattr(self.file, "seek"):
565 self.file.seek(*args)
567 def tell(self) -> t.Optional[int]:
568 if hasattr(self.file, "tell"):
569 return self.file.tell()
570 return None
572 def __iter__(self) -> "FileWrapper":
573 return self
575 def __next__(self) -> bytes:
576 data = self.file.read(self.buffer_size)
577 if data:
578 return data
579 raise StopIteration()
582class _RangeWrapper:
583 # private for now, but should we make it public in the future ?
585 """This class can be used to convert an iterable object into
586 an iterable that will only yield a piece of the underlying content.
587 It yields blocks until the underlying stream range is fully read.
588 The yielded blocks will have a size that can't exceed the original
589 iterator defined block size, but that can be smaller.
591 If you're using this object together with a :class:`Response` you have
592 to use the `direct_passthrough` mode.
594 :param iterable: an iterable object with a :meth:`__next__` method.
595 :param start_byte: byte from which read will start.
596 :param byte_range: how many bytes to read.
597 """
599 def __init__(
600 self,
601 iterable: t.Union[t.Iterable[bytes], t.IO[bytes]],
602 start_byte: int = 0,
603 byte_range: t.Optional[int] = None,
604 ):
605 self.iterable = iter(iterable)
606 self.byte_range = byte_range
607 self.start_byte = start_byte
608 self.end_byte = None
610 if byte_range is not None:
611 self.end_byte = start_byte + byte_range
613 self.read_length = 0
614 self.seekable = hasattr(iterable, "seekable") and iterable.seekable()
615 self.end_reached = False
617 def __iter__(self) -> "_RangeWrapper":
618 return self
620 def _next_chunk(self) -> bytes:
621 try:
622 chunk = next(self.iterable)
623 self.read_length += len(chunk)
624 return chunk
625 except StopIteration:
626 self.end_reached = True
627 raise
629 def _first_iteration(self) -> t.Tuple[t.Optional[bytes], int]:
630 chunk = None
631 if self.seekable:
632 self.iterable.seek(self.start_byte) # type: ignore
633 self.read_length = self.iterable.tell() # type: ignore
634 contextual_read_length = self.read_length
635 else:
636 while self.read_length <= self.start_byte:
637 chunk = self._next_chunk()
638 if chunk is not None:
639 chunk = chunk[self.start_byte - self.read_length :]
640 contextual_read_length = self.start_byte
641 return chunk, contextual_read_length
643 def _next(self) -> bytes:
644 if self.end_reached:
645 raise StopIteration()
646 chunk = None
647 contextual_read_length = self.read_length
648 if self.read_length == 0:
649 chunk, contextual_read_length = self._first_iteration()
650 if chunk is None:
651 chunk = self._next_chunk()
652 if self.end_byte is not None and self.read_length >= self.end_byte:
653 self.end_reached = True
654 return chunk[: self.end_byte - contextual_read_length]
655 return chunk
657 def __next__(self) -> bytes:
658 chunk = self._next()
659 if chunk:
660 return chunk
661 self.end_reached = True
662 raise StopIteration()
664 def close(self) -> None:
665 if hasattr(self.iterable, "close"):
666 self.iterable.close()
669def _make_chunk_iter(
670 stream: t.Union[t.Iterable[bytes], t.IO[bytes]],
671 limit: t.Optional[int],
672 buffer_size: int,
673) -> t.Iterator[bytes]:
674 """Helper for the line and chunk iter functions."""
675 if isinstance(stream, (bytes, bytearray, str)):
676 raise TypeError(
677 "Passed a string or byte object instead of true iterator or stream."
678 )
679 if not hasattr(stream, "read"):
680 for item in stream:
681 if item:
682 yield item
683 return
684 stream = t.cast(t.IO[bytes], stream)
685 if not isinstance(stream, LimitedStream) and limit is not None:
686 stream = t.cast(t.IO[bytes], LimitedStream(stream, limit))
687 _read = stream.read
688 while True:
689 item = _read(buffer_size)
690 if not item:
691 break
692 yield item
695def make_line_iter(
696 stream: t.Union[t.Iterable[bytes], t.IO[bytes]],
697 limit: t.Optional[int] = None,
698 buffer_size: int = 10 * 1024,
699 cap_at_buffer: bool = False,
700) -> t.Iterator[bytes]:
701 """Safely iterates line-based over an input stream. If the input stream
702 is not a :class:`LimitedStream` the `limit` parameter is mandatory.
704 This uses the stream's :meth:`~file.read` method internally as opposite
705 to the :meth:`~file.readline` method that is unsafe and can only be used
706 in violation of the WSGI specification. The same problem applies to the
707 `__iter__` function of the input stream which calls :meth:`~file.readline`
708 without arguments.
710 If you need line-by-line processing it's strongly recommended to iterate
711 over the input stream using this helper function.
713 .. versionchanged:: 0.8
714 This function now ensures that the limit was reached.
716 .. versionadded:: 0.9
717 added support for iterators as input stream.
719 .. versionadded:: 0.11.10
720 added support for the `cap_at_buffer` parameter.
722 :param stream: the stream or iterate to iterate over.
723 :param limit: the limit in bytes for the stream. (Usually
724 content length. Not necessary if the `stream`
725 is a :class:`LimitedStream`.
726 :param buffer_size: The optional buffer size.
727 :param cap_at_buffer: if this is set chunks are split if they are longer
728 than the buffer size. Internally this is implemented
729 that the buffer size might be exhausted by a factor
730 of two however.
731 """
732 _iter = _make_chunk_iter(stream, limit, buffer_size)
734 first_item = next(_iter, "")
735 if not first_item:
736 return
738 s = _make_encode_wrapper(first_item)
739 empty = t.cast(bytes, s(""))
740 cr = t.cast(bytes, s("\r"))
741 lf = t.cast(bytes, s("\n"))
742 crlf = t.cast(bytes, s("\r\n"))
744 _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter))
746 def _iter_basic_lines() -> t.Iterator[bytes]:
747 _join = empty.join
748 buffer: t.List[bytes] = []
749 while True:
750 new_data = next(_iter, "")
751 if not new_data:
752 break
753 new_buf: t.List[bytes] = []
754 buf_size = 0
755 for item in t.cast(
756 t.Iterator[bytes], chain(buffer, new_data.splitlines(True))
757 ):
758 new_buf.append(item)
759 buf_size += len(item)
760 if item and item[-1:] in crlf:
761 yield _join(new_buf)
762 new_buf = []
763 elif cap_at_buffer and buf_size >= buffer_size:
764 rv = _join(new_buf)
765 while len(rv) >= buffer_size:
766 yield rv[:buffer_size]
767 rv = rv[buffer_size:]
768 new_buf = [rv]
769 buffer = new_buf
770 if buffer:
771 yield _join(buffer)
773 # This hackery is necessary to merge 'foo\r' and '\n' into one item
774 # of 'foo\r\n' if we were unlucky and we hit a chunk boundary.
775 previous = empty
776 for item in _iter_basic_lines():
777 if item == lf and previous[-1:] == cr:
778 previous += item
779 item = empty
780 if previous:
781 yield previous
782 previous = item
783 if previous:
784 yield previous
787def make_chunk_iter(
788 stream: t.Union[t.Iterable[bytes], t.IO[bytes]],
789 separator: bytes,
790 limit: t.Optional[int] = None,
791 buffer_size: int = 10 * 1024,
792 cap_at_buffer: bool = False,
793) -> t.Iterator[bytes]:
794 """Works like :func:`make_line_iter` but accepts a separator
795 which divides chunks. If you want newline based processing
796 you should use :func:`make_line_iter` instead as it
797 supports arbitrary newline markers.
799 .. versionadded:: 0.8
801 .. versionadded:: 0.9
802 added support for iterators as input stream.
804 .. versionadded:: 0.11.10
805 added support for the `cap_at_buffer` parameter.
807 :param stream: the stream or iterate to iterate over.
808 :param separator: the separator that divides chunks.
809 :param limit: the limit in bytes for the stream. (Usually
810 content length. Not necessary if the `stream`
811 is otherwise already limited).
812 :param buffer_size: The optional buffer size.
813 :param cap_at_buffer: if this is set chunks are split if they are longer
814 than the buffer size. Internally this is implemented
815 that the buffer size might be exhausted by a factor
816 of two however.
817 """
818 _iter = _make_chunk_iter(stream, limit, buffer_size)
820 first_item = next(_iter, b"")
821 if not first_item:
822 return
824 _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter))
825 if isinstance(first_item, str):
826 separator = _to_str(separator)
827 _split = re.compile(f"({re.escape(separator)})").split
828 _join = "".join
829 else:
830 separator = _to_bytes(separator)
831 _split = re.compile(b"(" + re.escape(separator) + b")").split
832 _join = b"".join
834 buffer: t.List[bytes] = []
835 while True:
836 new_data = next(_iter, b"")
837 if not new_data:
838 break
839 chunks = _split(new_data)
840 new_buf: t.List[bytes] = []
841 buf_size = 0
842 for item in chain(buffer, chunks):
843 if item == separator:
844 yield _join(new_buf)
845 new_buf = []
846 buf_size = 0
847 else:
848 buf_size += len(item)
849 new_buf.append(item)
851 if cap_at_buffer and buf_size >= buffer_size:
852 rv = _join(new_buf)
853 while len(rv) >= buffer_size:
854 yield rv[:buffer_size]
855 rv = rv[buffer_size:]
856 new_buf = [rv]
857 buf_size = len(rv)
859 buffer = new_buf
860 if buffer:
861 yield _join(buffer)
864class LimitedStream(io.IOBase):
865 """Wraps a stream so that it doesn't read more than n bytes. If the
866 stream is exhausted and the caller tries to get more bytes from it
867 :func:`on_exhausted` is called which by default returns an empty
868 string. The return value of that function is forwarded
869 to the reader function. So if it returns an empty string
870 :meth:`read` will return an empty string as well.
872 The limit however must never be higher than what the stream can
873 output. Otherwise :meth:`readlines` will try to read past the
874 limit.
876 .. admonition:: Note on WSGI compliance
878 calls to :meth:`readline` and :meth:`readlines` are not
879 WSGI compliant because it passes a size argument to the
880 readline methods. Unfortunately the WSGI PEP is not safely
881 implementable without a size argument to :meth:`readline`
882 because there is no EOF marker in the stream. As a result
883 of that the use of :meth:`readline` is discouraged.
885 For the same reason iterating over the :class:`LimitedStream`
886 is not portable. It internally calls :meth:`readline`.
888 We strongly suggest using :meth:`read` only or using the
889 :func:`make_line_iter` which safely iterates line-based
890 over a WSGI input stream.
892 :param stream: the stream to wrap.
893 :param limit: the limit for the stream, must not be longer than
894 what the string can provide if the stream does not
895 end with `EOF` (like `wsgi.input`)
896 """
898 def __init__(self, stream: t.IO[bytes], limit: int) -> None:
899 self._read = stream.read
900 self._readline = stream.readline
901 self._pos = 0
902 self.limit = limit
904 def __iter__(self) -> "LimitedStream":
905 return self
907 @property
908 def is_exhausted(self) -> bool:
909 """If the stream is exhausted this attribute is `True`."""
910 return self._pos >= self.limit
912 def on_exhausted(self) -> bytes:
913 """This is called when the stream tries to read past the limit.
914 The return value of this function is returned from the reading
915 function.
916 """
917 # Read null bytes from the stream so that we get the
918 # correct end of stream marker.
919 return self._read(0)
921 def on_disconnect(self) -> bytes:
922 """What should happen if a disconnect is detected? The return
923 value of this function is returned from read functions in case
924 the client went away. By default a
925 :exc:`~werkzeug.exceptions.ClientDisconnected` exception is raised.
926 """
927 from .exceptions import ClientDisconnected
929 raise ClientDisconnected()
931 def _exhaust_chunks(self, chunk_size: int = 1024 * 64) -> t.Iterator[bytes]:
932 """Exhaust the stream by reading until the limit is reached or the client
933 disconnects, yielding each chunk.
935 :param chunk_size: How many bytes to read at a time.
937 :meta private:
939 .. versionadded:: 2.2.3
940 """
941 to_read = self.limit - self._pos
943 while to_read > 0:
944 chunk = self.read(min(to_read, chunk_size))
945 yield chunk
946 to_read -= len(chunk)
948 def exhaust(self, chunk_size: int = 1024 * 64) -> None:
949 """Exhaust the stream by reading until the limit is reached or the client
950 disconnects, discarding the data.
952 :param chunk_size: How many bytes to read at a time.
954 .. versionchanged:: 2.2.3
955 Handle case where wrapped stream returns fewer bytes than requested.
956 """
957 for _ in self._exhaust_chunks(chunk_size):
958 pass
960 def read(self, size: t.Optional[int] = None) -> bytes:
961 """Read up to ``size`` bytes from the underlying stream. If size is not
962 provided, read until the limit.
964 If the limit is reached, :meth:`on_exhausted` is called, which returns empty
965 bytes.
967 If no bytes are read and the limit is not reached, or if an error occurs during
968 the read, :meth:`on_disconnect` is called, which raises
969 :exc:`.ClientDisconnected`.
971 :param size: The number of bytes to read. ``None``, default, reads until the
972 limit is reached.
974 .. versionchanged:: 2.2.3
975 Handle case where wrapped stream returns fewer bytes than requested.
976 """
977 if self._pos >= self.limit:
978 return self.on_exhausted()
980 if size is None or size == -1: # -1 is for consistency with file
981 # Keep reading from the wrapped stream until the limit is reached. Can't
982 # rely on stream.read(size) because it's not guaranteed to return size.
983 buf = bytearray()
985 for chunk in self._exhaust_chunks():
986 buf.extend(chunk)
988 return bytes(buf)
990 to_read = min(self.limit - self._pos, size)
992 try:
993 read = self._read(to_read)
994 except (OSError, ValueError):
995 return self.on_disconnect()
997 if to_read and not len(read):
998 # If no data was read, treat it as a disconnect. As long as some data was
999 # read, a subsequent call can still return more before reaching the limit.
1000 return self.on_disconnect()
1002 self._pos += len(read)
1003 return read
1005 def readline(self, size: t.Optional[int] = None) -> bytes:
1006 """Reads one line from the stream."""
1007 if self._pos >= self.limit:
1008 return self.on_exhausted()
1009 if size is None:
1010 size = self.limit - self._pos
1011 else:
1012 size = min(size, self.limit - self._pos)
1013 try:
1014 line = self._readline(size)
1015 except (ValueError, OSError):
1016 return self.on_disconnect()
1017 if size and not line:
1018 return self.on_disconnect()
1019 self._pos += len(line)
1020 return line
1022 def readlines(self, size: t.Optional[int] = None) -> t.List[bytes]:
1023 """Reads a file into a list of strings. It calls :meth:`readline`
1024 until the file is read to the end. It does support the optional
1025 `size` argument if the underlying stream supports it for
1026 `readline`.
1027 """
1028 last_pos = self._pos
1029 result = []
1030 if size is not None:
1031 end = min(self.limit, last_pos + size)
1032 else:
1033 end = self.limit
1034 while True:
1035 if size is not None:
1036 size -= last_pos - self._pos
1037 if self._pos >= end:
1038 break
1039 result.append(self.readline(size))
1040 if size is not None:
1041 last_pos = self._pos
1042 return result
1044 def tell(self) -> int:
1045 """Returns the position of the stream.
1047 .. versionadded:: 0.9
1048 """
1049 return self._pos
1051 def __next__(self) -> bytes:
1052 line = self.readline()
1053 if not line:
1054 raise StopIteration()
1055 return line
1057 def readable(self) -> bool:
1058 return True