Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/wsgi.py: 18%
398 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:03 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:03 +0000
1import io
2import re
3import typing as t
4from functools import partial
5from functools import update_wrapper
6from itertools import chain
8from ._internal import _make_encode_wrapper
9from ._internal import _to_bytes
10from ._internal import _to_str
11from .sansio import utils as _sansio_utils
12from .sansio.utils import host_is_trusted # noqa: F401 # Imported as part of API
13from .urls import _URLTuple
14from .urls import uri_to_iri
15from .urls import url_join
16from .urls import url_parse
17from .urls import url_quote
19if t.TYPE_CHECKING:
20 from _typeshed.wsgi import WSGIApplication
21 from _typeshed.wsgi import WSGIEnvironment
24def responder(f: t.Callable[..., "WSGIApplication"]) -> "WSGIApplication":
25 """Marks a function as responder. Decorate a function with it and it
26 will automatically call the return value as WSGI application.
28 Example::
30 @responder
31 def application(environ, start_response):
32 return Response('Hello World!')
33 """
34 return update_wrapper(lambda *a: f(*a)(*a[-2:]), f)
37def get_current_url(
38 environ: "WSGIEnvironment",
39 root_only: bool = False,
40 strip_querystring: bool = False,
41 host_only: bool = False,
42 trusted_hosts: t.Optional[t.Iterable[str]] = None,
43) -> str:
44 """Recreate the URL for a request from the parts in a WSGI
45 environment.
47 The URL is an IRI, not a URI, so it may contain Unicode characters.
48 Use :func:`~werkzeug.urls.iri_to_uri` to convert it to ASCII.
50 :param environ: The WSGI environment to get the URL parts from.
51 :param root_only: Only build the root path, don't include the
52 remaining path or query string.
53 :param strip_querystring: Don't include the query string.
54 :param host_only: Only build the scheme and host.
55 :param trusted_hosts: A list of trusted host names to validate the
56 host against.
57 """
58 parts = {
59 "scheme": environ["wsgi.url_scheme"],
60 "host": get_host(environ, trusted_hosts),
61 }
63 if not host_only:
64 parts["root_path"] = environ.get("SCRIPT_NAME", "")
66 if not root_only:
67 parts["path"] = environ.get("PATH_INFO", "")
69 if not strip_querystring:
70 parts["query_string"] = environ.get("QUERY_STRING", "").encode("latin1")
72 return _sansio_utils.get_current_url(**parts)
75def _get_server(
76 environ: "WSGIEnvironment",
77) -> t.Optional[t.Tuple[str, t.Optional[int]]]:
78 name = environ.get("SERVER_NAME")
80 if name is None:
81 return None
83 try:
84 port: t.Optional[int] = int(environ.get("SERVER_PORT", None))
85 except (TypeError, ValueError):
86 # unix socket
87 port = None
89 return name, port
92def get_host(
93 environ: "WSGIEnvironment", trusted_hosts: t.Optional[t.Iterable[str]] = None
94) -> str:
95 """Return the host for the given WSGI environment.
97 The ``Host`` header is preferred, then ``SERVER_NAME`` if it's not
98 set. The returned host will only contain the port if it is different
99 than the standard port for the protocol.
101 Optionally, verify that the host is trusted using
102 :func:`host_is_trusted` and raise a
103 :exc:`~werkzeug.exceptions.SecurityError` if it is not.
105 :param environ: A WSGI environment dict.
106 :param trusted_hosts: A list of trusted host names.
108 :return: Host, with port if necessary.
109 :raise ~werkzeug.exceptions.SecurityError: If the host is not
110 trusted.
111 """
112 return _sansio_utils.get_host(
113 environ["wsgi.url_scheme"],
114 environ.get("HTTP_HOST"),
115 _get_server(environ),
116 trusted_hosts,
117 )
120def get_content_length(environ: "WSGIEnvironment") -> t.Optional[int]:
121 """Returns the content length from the WSGI environment as
122 integer. If it's not available or chunked transfer encoding is used,
123 ``None`` is returned.
125 .. versionadded:: 0.9
127 :param environ: the WSGI environ to fetch the content length from.
128 """
129 if environ.get("HTTP_TRANSFER_ENCODING", "") == "chunked":
130 return None
132 content_length = environ.get("CONTENT_LENGTH")
133 if content_length is not None:
134 try:
135 return max(0, int(content_length))
136 except (ValueError, TypeError):
137 pass
138 return None
141def get_input_stream(
142 environ: "WSGIEnvironment", safe_fallback: bool = True
143) -> t.IO[bytes]:
144 """Returns the input stream from the WSGI environment and wraps it
145 in the most sensible way possible. The stream returned is not the
146 raw WSGI stream in most cases but one that is safe to read from
147 without taking into account the content length.
149 If content length is not set, the stream will be empty for safety reasons.
150 If the WSGI server supports chunked or infinite streams, it should set
151 the ``wsgi.input_terminated`` value in the WSGI environ to indicate that.
153 .. versionadded:: 0.9
155 :param environ: the WSGI environ to fetch the stream from.
156 :param safe_fallback: use an empty stream as a safe fallback when the
157 content length is not set. Disabling this allows infinite streams,
158 which can be a denial-of-service risk.
159 """
160 stream = t.cast(t.IO[bytes], environ["wsgi.input"])
161 content_length = get_content_length(environ)
163 # A wsgi extension that tells us if the input is terminated. In
164 # that case we return the stream unchanged as we know we can safely
165 # read it until the end.
166 if environ.get("wsgi.input_terminated"):
167 return stream
169 # If the request doesn't specify a content length, returning the stream is
170 # potentially dangerous because it could be infinite, malicious or not. If
171 # safe_fallback is true, return an empty stream instead for safety.
172 if content_length is None:
173 return io.BytesIO() if safe_fallback else stream
175 # Otherwise limit the stream to the content length
176 return t.cast(t.IO[bytes], LimitedStream(stream, content_length))
179def get_query_string(environ: "WSGIEnvironment") -> str:
180 """Returns the ``QUERY_STRING`` from the WSGI environment. This also
181 takes care of the WSGI decoding dance. The string returned will be
182 restricted to ASCII characters.
184 :param environ: WSGI environment to get the query string from.
186 .. versionadded:: 0.9
187 """
188 qs = environ.get("QUERY_STRING", "").encode("latin1")
189 # QUERY_STRING really should be ascii safe but some browsers
190 # will send us some unicode stuff (I am looking at you IE).
191 # In that case we want to urllib quote it badly.
192 return url_quote(qs, safe=":&%=+$!*'(),")
195def get_path_info(
196 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace"
197) -> str:
198 """Return the ``PATH_INFO`` from the WSGI environment and decode it
199 unless ``charset`` is ``None``.
201 :param environ: WSGI environment to get the path from.
202 :param charset: The charset for the path info, or ``None`` if no
203 decoding should be performed.
204 :param errors: The decoding error handling.
206 .. versionadded:: 0.9
207 """
208 path = environ.get("PATH_INFO", "").encode("latin1")
209 return _to_str(path, charset, errors, allow_none_charset=True) # type: ignore
212def get_script_name(
213 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace"
214) -> str:
215 """Return the ``SCRIPT_NAME`` from the WSGI environment and decode
216 it unless `charset` is set to ``None``.
218 :param environ: WSGI environment to get the path from.
219 :param charset: The charset for the path, or ``None`` if no decoding
220 should be performed.
221 :param errors: The decoding error handling.
223 .. versionadded:: 0.9
224 """
225 path = environ.get("SCRIPT_NAME", "").encode("latin1")
226 return _to_str(path, charset, errors, allow_none_charset=True) # type: ignore
229def pop_path_info(
230 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace"
231) -> t.Optional[str]:
232 """Removes and returns the next segment of `PATH_INFO`, pushing it onto
233 `SCRIPT_NAME`. Returns `None` if there is nothing left on `PATH_INFO`.
235 If the `charset` is set to `None` bytes are returned.
237 If there are empty segments (``'/foo//bar``) these are ignored but
238 properly pushed to the `SCRIPT_NAME`:
240 >>> env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b'}
241 >>> pop_path_info(env)
242 'a'
243 >>> env['SCRIPT_NAME']
244 '/foo/a'
245 >>> pop_path_info(env)
246 'b'
247 >>> env['SCRIPT_NAME']
248 '/foo/a/b'
250 .. versionadded:: 0.5
252 .. versionchanged:: 0.9
253 The path is now decoded and a charset and encoding
254 parameter can be provided.
256 :param environ: the WSGI environment that is modified.
257 :param charset: The ``encoding`` parameter passed to
258 :func:`bytes.decode`.
259 :param errors: The ``errors`` paramater passed to
260 :func:`bytes.decode`.
261 """
262 path = environ.get("PATH_INFO")
263 if not path:
264 return None
266 script_name = environ.get("SCRIPT_NAME", "")
268 # shift multiple leading slashes over
269 old_path = path
270 path = path.lstrip("/")
271 if path != old_path:
272 script_name += "/" * (len(old_path) - len(path))
274 if "/" not in path:
275 environ["PATH_INFO"] = ""
276 environ["SCRIPT_NAME"] = script_name + path
277 rv = path.encode("latin1")
278 else:
279 segment, path = path.split("/", 1)
280 environ["PATH_INFO"] = f"/{path}"
281 environ["SCRIPT_NAME"] = script_name + segment
282 rv = segment.encode("latin1")
284 return _to_str(rv, charset, errors, allow_none_charset=True) # type: ignore
287def peek_path_info(
288 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace"
289) -> t.Optional[str]:
290 """Returns the next segment on the `PATH_INFO` or `None` if there
291 is none. Works like :func:`pop_path_info` without modifying the
292 environment:
294 >>> env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b'}
295 >>> peek_path_info(env)
296 'a'
297 >>> peek_path_info(env)
298 'a'
300 If the `charset` is set to `None` bytes are returned.
302 .. versionadded:: 0.5
304 .. versionchanged:: 0.9
305 The path is now decoded and a charset and encoding
306 parameter can be provided.
308 :param environ: the WSGI environment that is checked.
309 """
310 segments = environ.get("PATH_INFO", "").lstrip("/").split("/", 1)
311 if segments:
312 return _to_str( # type: ignore
313 segments[0].encode("latin1"), charset, errors, allow_none_charset=True
314 )
315 return None
318def extract_path_info(
319 environ_or_baseurl: t.Union[str, "WSGIEnvironment"],
320 path_or_url: t.Union[str, _URLTuple],
321 charset: str = "utf-8",
322 errors: str = "werkzeug.url_quote",
323 collapse_http_schemes: bool = True,
324) -> t.Optional[str]:
325 """Extracts the path info from the given URL (or WSGI environment) and
326 path. The path info returned is a string. The URLs might also be IRIs.
328 If the path info could not be determined, `None` is returned.
330 Some examples:
332 >>> extract_path_info('http://example.com/app', '/app/hello')
333 '/hello'
334 >>> extract_path_info('http://example.com/app',
335 ... 'https://example.com/app/hello')
336 '/hello'
337 >>> extract_path_info('http://example.com/app',
338 ... 'https://example.com/app/hello',
339 ... collapse_http_schemes=False) is None
340 True
342 Instead of providing a base URL you can also pass a WSGI environment.
344 :param environ_or_baseurl: a WSGI environment dict, a base URL or
345 base IRI. This is the root of the
346 application.
347 :param path_or_url: an absolute path from the server root, a
348 relative path (in which case it's the path info)
349 or a full URL.
350 :param charset: the charset for byte data in URLs
351 :param errors: the error handling on decode
352 :param collapse_http_schemes: if set to `False` the algorithm does
353 not assume that http and https on the
354 same server point to the same
355 resource.
357 .. versionchanged:: 0.15
358 The ``errors`` parameter defaults to leaving invalid bytes
359 quoted instead of replacing them.
361 .. versionadded:: 0.6
362 """
364 def _normalize_netloc(scheme: str, netloc: str) -> str:
365 parts = netloc.split("@", 1)[-1].split(":", 1)
366 port: t.Optional[str]
368 if len(parts) == 2:
369 netloc, port = parts
370 if (scheme == "http" and port == "80") or (
371 scheme == "https" and port == "443"
372 ):
373 port = None
374 else:
375 netloc = parts[0]
376 port = None
378 if port is not None:
379 netloc += f":{port}"
381 return netloc
383 # make sure whatever we are working on is a IRI and parse it
384 path = uri_to_iri(path_or_url, charset, errors)
385 if isinstance(environ_or_baseurl, dict):
386 environ_or_baseurl = get_current_url(environ_or_baseurl, root_only=True)
387 base_iri = uri_to_iri(environ_or_baseurl, charset, errors)
388 base_scheme, base_netloc, base_path = url_parse(base_iri)[:3]
389 cur_scheme, cur_netloc, cur_path = url_parse(url_join(base_iri, path))[:3]
391 # normalize the network location
392 base_netloc = _normalize_netloc(base_scheme, base_netloc)
393 cur_netloc = _normalize_netloc(cur_scheme, cur_netloc)
395 # is that IRI even on a known HTTP scheme?
396 if collapse_http_schemes:
397 for scheme in base_scheme, cur_scheme:
398 if scheme not in ("http", "https"):
399 return None
400 else:
401 if not (base_scheme in ("http", "https") and base_scheme == cur_scheme):
402 return None
404 # are the netlocs compatible?
405 if base_netloc != cur_netloc:
406 return None
408 # are we below the application path?
409 base_path = base_path.rstrip("/")
410 if not cur_path.startswith(base_path):
411 return None
413 return f"/{cur_path[len(base_path) :].lstrip('/')}"
416class ClosingIterator:
417 """The WSGI specification requires that all middlewares and gateways
418 respect the `close` callback of the iterable returned by the application.
419 Because it is useful to add another close action to a returned iterable
420 and adding a custom iterable is a boring task this class can be used for
421 that::
423 return ClosingIterator(app(environ, start_response), [cleanup_session,
424 cleanup_locals])
426 If there is just one close function it can be passed instead of the list.
428 A closing iterator is not needed if the application uses response objects
429 and finishes the processing if the response is started::
431 try:
432 return response(environ, start_response)
433 finally:
434 cleanup_session()
435 cleanup_locals()
436 """
438 def __init__(
439 self,
440 iterable: t.Iterable[bytes],
441 callbacks: t.Optional[
442 t.Union[t.Callable[[], None], t.Iterable[t.Callable[[], None]]]
443 ] = None,
444 ) -> None:
445 iterator = iter(iterable)
446 self._next = t.cast(t.Callable[[], bytes], partial(next, iterator))
447 if callbacks is None:
448 callbacks = []
449 elif callable(callbacks):
450 callbacks = [callbacks]
451 else:
452 callbacks = list(callbacks)
453 iterable_close = getattr(iterable, "close", None)
454 if iterable_close:
455 callbacks.insert(0, iterable_close)
456 self._callbacks = callbacks
458 def __iter__(self) -> "ClosingIterator":
459 return self
461 def __next__(self) -> bytes:
462 return self._next()
464 def close(self) -> None:
465 for callback in self._callbacks:
466 callback()
469def wrap_file(
470 environ: "WSGIEnvironment", file: t.IO[bytes], buffer_size: int = 8192
471) -> t.Iterable[bytes]:
472 """Wraps a file. This uses the WSGI server's file wrapper if available
473 or otherwise the generic :class:`FileWrapper`.
475 .. versionadded:: 0.5
477 If the file wrapper from the WSGI server is used it's important to not
478 iterate over it from inside the application but to pass it through
479 unchanged. If you want to pass out a file wrapper inside a response
480 object you have to set :attr:`Response.direct_passthrough` to `True`.
482 More information about file wrappers are available in :pep:`333`.
484 :param file: a :class:`file`-like object with a :meth:`~file.read` method.
485 :param buffer_size: number of bytes for one iteration.
486 """
487 return environ.get("wsgi.file_wrapper", FileWrapper)( # type: ignore
488 file, buffer_size
489 )
492class FileWrapper:
493 """This class can be used to convert a :class:`file`-like object into
494 an iterable. It yields `buffer_size` blocks until the file is fully
495 read.
497 You should not use this class directly but rather use the
498 :func:`wrap_file` function that uses the WSGI server's file wrapper
499 support if it's available.
501 .. versionadded:: 0.5
503 If you're using this object together with a :class:`Response` you have
504 to use the `direct_passthrough` mode.
506 :param file: a :class:`file`-like object with a :meth:`~file.read` method.
507 :param buffer_size: number of bytes for one iteration.
508 """
510 def __init__(self, file: t.IO[bytes], buffer_size: int = 8192) -> None:
511 self.file = file
512 self.buffer_size = buffer_size
514 def close(self) -> None:
515 if hasattr(self.file, "close"):
516 self.file.close()
518 def seekable(self) -> bool:
519 if hasattr(self.file, "seekable"):
520 return self.file.seekable()
521 if hasattr(self.file, "seek"):
522 return True
523 return False
525 def seek(self, *args: t.Any) -> None:
526 if hasattr(self.file, "seek"):
527 self.file.seek(*args)
529 def tell(self) -> t.Optional[int]:
530 if hasattr(self.file, "tell"):
531 return self.file.tell()
532 return None
534 def __iter__(self) -> "FileWrapper":
535 return self
537 def __next__(self) -> bytes:
538 data = self.file.read(self.buffer_size)
539 if data:
540 return data
541 raise StopIteration()
544class _RangeWrapper:
545 # private for now, but should we make it public in the future ?
547 """This class can be used to convert an iterable object into
548 an iterable that will only yield a piece of the underlying content.
549 It yields blocks until the underlying stream range is fully read.
550 The yielded blocks will have a size that can't exceed the original
551 iterator defined block size, but that can be smaller.
553 If you're using this object together with a :class:`Response` you have
554 to use the `direct_passthrough` mode.
556 :param iterable: an iterable object with a :meth:`__next__` method.
557 :param start_byte: byte from which read will start.
558 :param byte_range: how many bytes to read.
559 """
561 def __init__(
562 self,
563 iterable: t.Union[t.Iterable[bytes], t.IO[bytes]],
564 start_byte: int = 0,
565 byte_range: t.Optional[int] = None,
566 ):
567 self.iterable = iter(iterable)
568 self.byte_range = byte_range
569 self.start_byte = start_byte
570 self.end_byte = None
572 if byte_range is not None:
573 self.end_byte = start_byte + byte_range
575 self.read_length = 0
576 self.seekable = (
577 hasattr(iterable, "seekable") and iterable.seekable() # type: ignore
578 )
579 self.end_reached = False
581 def __iter__(self) -> "_RangeWrapper":
582 return self
584 def _next_chunk(self) -> bytes:
585 try:
586 chunk = next(self.iterable)
587 self.read_length += len(chunk)
588 return chunk
589 except StopIteration:
590 self.end_reached = True
591 raise
593 def _first_iteration(self) -> t.Tuple[t.Optional[bytes], int]:
594 chunk = None
595 if self.seekable:
596 self.iterable.seek(self.start_byte) # type: ignore
597 self.read_length = self.iterable.tell() # type: ignore
598 contextual_read_length = self.read_length
599 else:
600 while self.read_length <= self.start_byte:
601 chunk = self._next_chunk()
602 if chunk is not None:
603 chunk = chunk[self.start_byte - self.read_length :]
604 contextual_read_length = self.start_byte
605 return chunk, contextual_read_length
607 def _next(self) -> bytes:
608 if self.end_reached:
609 raise StopIteration()
610 chunk = None
611 contextual_read_length = self.read_length
612 if self.read_length == 0:
613 chunk, contextual_read_length = self._first_iteration()
614 if chunk is None:
615 chunk = self._next_chunk()
616 if self.end_byte is not None and self.read_length >= self.end_byte:
617 self.end_reached = True
618 return chunk[: self.end_byte - contextual_read_length]
619 return chunk
621 def __next__(self) -> bytes:
622 chunk = self._next()
623 if chunk:
624 return chunk
625 self.end_reached = True
626 raise StopIteration()
628 def close(self) -> None:
629 if hasattr(self.iterable, "close"):
630 self.iterable.close() # type: ignore
633def _make_chunk_iter(
634 stream: t.Union[t.Iterable[bytes], t.IO[bytes]],
635 limit: t.Optional[int],
636 buffer_size: int,
637) -> t.Iterator[bytes]:
638 """Helper for the line and chunk iter functions."""
639 if isinstance(stream, (bytes, bytearray, str)):
640 raise TypeError(
641 "Passed a string or byte object instead of true iterator or stream."
642 )
643 if not hasattr(stream, "read"):
644 for item in stream:
645 if item:
646 yield item
647 return
648 stream = t.cast(t.IO[bytes], stream)
649 if not isinstance(stream, LimitedStream) and limit is not None:
650 stream = t.cast(t.IO[bytes], LimitedStream(stream, limit))
651 _read = stream.read
652 while True:
653 item = _read(buffer_size)
654 if not item:
655 break
656 yield item
659def make_line_iter(
660 stream: t.Union[t.Iterable[bytes], t.IO[bytes]],
661 limit: t.Optional[int] = None,
662 buffer_size: int = 10 * 1024,
663 cap_at_buffer: bool = False,
664) -> t.Iterator[bytes]:
665 """Safely iterates line-based over an input stream. If the input stream
666 is not a :class:`LimitedStream` the `limit` parameter is mandatory.
668 This uses the stream's :meth:`~file.read` method internally as opposite
669 to the :meth:`~file.readline` method that is unsafe and can only be used
670 in violation of the WSGI specification. The same problem applies to the
671 `__iter__` function of the input stream which calls :meth:`~file.readline`
672 without arguments.
674 If you need line-by-line processing it's strongly recommended to iterate
675 over the input stream using this helper function.
677 .. versionchanged:: 0.8
678 This function now ensures that the limit was reached.
680 .. versionadded:: 0.9
681 added support for iterators as input stream.
683 .. versionadded:: 0.11.10
684 added support for the `cap_at_buffer` parameter.
686 :param stream: the stream or iterate to iterate over.
687 :param limit: the limit in bytes for the stream. (Usually
688 content length. Not necessary if the `stream`
689 is a :class:`LimitedStream`.
690 :param buffer_size: The optional buffer size.
691 :param cap_at_buffer: if this is set chunks are split if they are longer
692 than the buffer size. Internally this is implemented
693 that the buffer size might be exhausted by a factor
694 of two however.
695 """
696 _iter = _make_chunk_iter(stream, limit, buffer_size)
698 first_item = next(_iter, "")
699 if not first_item:
700 return
702 s = _make_encode_wrapper(first_item)
703 empty = t.cast(bytes, s(""))
704 cr = t.cast(bytes, s("\r"))
705 lf = t.cast(bytes, s("\n"))
706 crlf = t.cast(bytes, s("\r\n"))
708 _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter))
710 def _iter_basic_lines() -> t.Iterator[bytes]:
711 _join = empty.join
712 buffer: t.List[bytes] = []
713 while True:
714 new_data = next(_iter, "")
715 if not new_data:
716 break
717 new_buf: t.List[bytes] = []
718 buf_size = 0
719 for item in t.cast(
720 t.Iterator[bytes], chain(buffer, new_data.splitlines(True))
721 ):
722 new_buf.append(item)
723 buf_size += len(item)
724 if item and item[-1:] in crlf:
725 yield _join(new_buf)
726 new_buf = []
727 elif cap_at_buffer and buf_size >= buffer_size:
728 rv = _join(new_buf)
729 while len(rv) >= buffer_size:
730 yield rv[:buffer_size]
731 rv = rv[buffer_size:]
732 new_buf = [rv]
733 buffer = new_buf
734 if buffer:
735 yield _join(buffer)
737 # This hackery is necessary to merge 'foo\r' and '\n' into one item
738 # of 'foo\r\n' if we were unlucky and we hit a chunk boundary.
739 previous = empty
740 for item in _iter_basic_lines():
741 if item == lf and previous[-1:] == cr:
742 previous += item
743 item = empty
744 if previous:
745 yield previous
746 previous = item
747 if previous:
748 yield previous
751def make_chunk_iter(
752 stream: t.Union[t.Iterable[bytes], t.IO[bytes]],
753 separator: bytes,
754 limit: t.Optional[int] = None,
755 buffer_size: int = 10 * 1024,
756 cap_at_buffer: bool = False,
757) -> t.Iterator[bytes]:
758 """Works like :func:`make_line_iter` but accepts a separator
759 which divides chunks. If you want newline based processing
760 you should use :func:`make_line_iter` instead as it
761 supports arbitrary newline markers.
763 .. versionadded:: 0.8
765 .. versionadded:: 0.9
766 added support for iterators as input stream.
768 .. versionadded:: 0.11.10
769 added support for the `cap_at_buffer` parameter.
771 :param stream: the stream or iterate to iterate over.
772 :param separator: the separator that divides chunks.
773 :param limit: the limit in bytes for the stream. (Usually
774 content length. Not necessary if the `stream`
775 is otherwise already limited).
776 :param buffer_size: The optional buffer size.
777 :param cap_at_buffer: if this is set chunks are split if they are longer
778 than the buffer size. Internally this is implemented
779 that the buffer size might be exhausted by a factor
780 of two however.
781 """
782 _iter = _make_chunk_iter(stream, limit, buffer_size)
784 first_item = next(_iter, b"")
785 if not first_item:
786 return
788 _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter))
789 if isinstance(first_item, str):
790 separator = _to_str(separator)
791 _split = re.compile(f"({re.escape(separator)})").split
792 _join = "".join
793 else:
794 separator = _to_bytes(separator)
795 _split = re.compile(b"(" + re.escape(separator) + b")").split
796 _join = b"".join
798 buffer: t.List[bytes] = []
799 while True:
800 new_data = next(_iter, b"")
801 if not new_data:
802 break
803 chunks = _split(new_data)
804 new_buf: t.List[bytes] = []
805 buf_size = 0
806 for item in chain(buffer, chunks):
807 if item == separator:
808 yield _join(new_buf)
809 new_buf = []
810 buf_size = 0
811 else:
812 buf_size += len(item)
813 new_buf.append(item)
815 if cap_at_buffer and buf_size >= buffer_size:
816 rv = _join(new_buf)
817 while len(rv) >= buffer_size:
818 yield rv[:buffer_size]
819 rv = rv[buffer_size:]
820 new_buf = [rv]
821 buf_size = len(rv)
823 buffer = new_buf
824 if buffer:
825 yield _join(buffer)
828class LimitedStream(io.IOBase):
829 """Wraps a stream so that it doesn't read more than n bytes. If the
830 stream is exhausted and the caller tries to get more bytes from it
831 :func:`on_exhausted` is called which by default returns an empty
832 string. The return value of that function is forwarded
833 to the reader function. So if it returns an empty string
834 :meth:`read` will return an empty string as well.
836 The limit however must never be higher than what the stream can
837 output. Otherwise :meth:`readlines` will try to read past the
838 limit.
840 .. admonition:: Note on WSGI compliance
842 calls to :meth:`readline` and :meth:`readlines` are not
843 WSGI compliant because it passes a size argument to the
844 readline methods. Unfortunately the WSGI PEP is not safely
845 implementable without a size argument to :meth:`readline`
846 because there is no EOF marker in the stream. As a result
847 of that the use of :meth:`readline` is discouraged.
849 For the same reason iterating over the :class:`LimitedStream`
850 is not portable. It internally calls :meth:`readline`.
852 We strongly suggest using :meth:`read` only or using the
853 :func:`make_line_iter` which safely iterates line-based
854 over a WSGI input stream.
856 :param stream: the stream to wrap.
857 :param limit: the limit for the stream, must not be longer than
858 what the string can provide if the stream does not
859 end with `EOF` (like `wsgi.input`)
860 """
862 def __init__(self, stream: t.IO[bytes], limit: int) -> None:
863 self._read = stream.read
864 self._readline = stream.readline
865 self._pos = 0
866 self.limit = limit
868 def __iter__(self) -> "LimitedStream":
869 return self
871 @property
872 def is_exhausted(self) -> bool:
873 """If the stream is exhausted this attribute is `True`."""
874 return self._pos >= self.limit
876 def on_exhausted(self) -> bytes:
877 """This is called when the stream tries to read past the limit.
878 The return value of this function is returned from the reading
879 function.
880 """
881 # Read null bytes from the stream so that we get the
882 # correct end of stream marker.
883 return self._read(0)
885 def on_disconnect(self) -> bytes:
886 """What should happen if a disconnect is detected? The return
887 value of this function is returned from read functions in case
888 the client went away. By default a
889 :exc:`~werkzeug.exceptions.ClientDisconnected` exception is raised.
890 """
891 from .exceptions import ClientDisconnected
893 raise ClientDisconnected()
895 def exhaust(self, chunk_size: int = 1024 * 64) -> None:
896 """Exhaust the stream. This consumes all the data left until the
897 limit is reached.
899 :param chunk_size: the size for a chunk. It will read the chunk
900 until the stream is exhausted and throw away
901 the results.
902 """
903 to_read = self.limit - self._pos
904 chunk = chunk_size
905 while to_read > 0:
906 chunk = min(to_read, chunk)
907 self.read(chunk)
908 to_read -= chunk
910 def read(self, size: t.Optional[int] = None) -> bytes:
911 """Read `size` bytes or if size is not provided everything is read.
913 :param size: the number of bytes read.
914 """
915 if self._pos >= self.limit:
916 return self.on_exhausted()
917 if size is None or size == -1: # -1 is for consistence with file
918 size = self.limit
919 to_read = min(self.limit - self._pos, size)
920 try:
921 read = self._read(to_read)
922 except (OSError, ValueError):
923 return self.on_disconnect()
924 if to_read and len(read) != to_read:
925 return self.on_disconnect()
926 self._pos += len(read)
927 return read
929 def readline(self, size: t.Optional[int] = None) -> bytes:
930 """Reads one line from the stream."""
931 if self._pos >= self.limit:
932 return self.on_exhausted()
933 if size is None:
934 size = self.limit - self._pos
935 else:
936 size = min(size, self.limit - self._pos)
937 try:
938 line = self._readline(size)
939 except (ValueError, OSError):
940 return self.on_disconnect()
941 if size and not line:
942 return self.on_disconnect()
943 self._pos += len(line)
944 return line
946 def readlines(self, size: t.Optional[int] = None) -> t.List[bytes]:
947 """Reads a file into a list of strings. It calls :meth:`readline`
948 until the file is read to the end. It does support the optional
949 `size` argument if the underlying stream supports it for
950 `readline`.
951 """
952 last_pos = self._pos
953 result = []
954 if size is not None:
955 end = min(self.limit, last_pos + size)
956 else:
957 end = self.limit
958 while True:
959 if size is not None:
960 size -= last_pos - self._pos
961 if self._pos >= end:
962 break
963 result.append(self.readline(size))
964 if size is not None:
965 last_pos = self._pos
966 return result
968 def tell(self) -> int:
969 """Returns the position of the stream.
971 .. versionadded:: 0.9
972 """
973 return self._pos
975 def __next__(self) -> bytes:
976 line = self.readline()
977 if not line:
978 raise StopIteration()
979 return line
981 def readable(self) -> bool:
982 return True