Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/wsgi.py: 17%
396 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1import io
2import re
3import typing as t
4import warnings
5from functools import partial
6from functools import update_wrapper
7from itertools import chain
9from ._internal import _make_encode_wrapper
10from ._internal import _to_bytes
11from ._internal import _to_str
12from .sansio import utils as _sansio_utils
13from .sansio.utils import host_is_trusted # noqa: F401 # Imported as part of API
14from .urls import _URLTuple
15from .urls import uri_to_iri
16from .urls import url_join
17from .urls import url_parse
18from .urls import url_quote
20if t.TYPE_CHECKING:
21 from _typeshed.wsgi import WSGIApplication
22 from _typeshed.wsgi import WSGIEnvironment
25def responder(f: t.Callable[..., "WSGIApplication"]) -> "WSGIApplication":
26 """Marks a function as responder. Decorate a function with it and it
27 will automatically call the return value as WSGI application.
29 Example::
31 @responder
32 def application(environ, start_response):
33 return Response('Hello World!')
34 """
35 return update_wrapper(lambda *a: f(*a)(*a[-2:]), f)
38def get_current_url(
39 environ: "WSGIEnvironment",
40 root_only: bool = False,
41 strip_querystring: bool = False,
42 host_only: bool = False,
43 trusted_hosts: t.Optional[t.Iterable[str]] = None,
44) -> str:
45 """Recreate the URL for a request from the parts in a WSGI
46 environment.
48 The URL is an IRI, not a URI, so it may contain Unicode characters.
49 Use :func:`~werkzeug.urls.iri_to_uri` to convert it to ASCII.
51 :param environ: The WSGI environment to get the URL parts from.
52 :param root_only: Only build the root path, don't include the
53 remaining path or query string.
54 :param strip_querystring: Don't include the query string.
55 :param host_only: Only build the scheme and host.
56 :param trusted_hosts: A list of trusted host names to validate the
57 host against.
58 """
59 parts = {
60 "scheme": environ["wsgi.url_scheme"],
61 "host": get_host(environ, trusted_hosts),
62 }
64 if not host_only:
65 parts["root_path"] = environ.get("SCRIPT_NAME", "")
67 if not root_only:
68 parts["path"] = environ.get("PATH_INFO", "")
70 if not strip_querystring:
71 parts["query_string"] = environ.get("QUERY_STRING", "").encode("latin1")
73 return _sansio_utils.get_current_url(**parts)
76def _get_server(
77 environ: "WSGIEnvironment",
78) -> t.Optional[t.Tuple[str, t.Optional[int]]]:
79 name = environ.get("SERVER_NAME")
81 if name is None:
82 return None
84 try:
85 port: t.Optional[int] = int(environ.get("SERVER_PORT", None))
86 except (TypeError, ValueError):
87 # unix socket
88 port = None
90 return name, port
93def get_host(
94 environ: "WSGIEnvironment", trusted_hosts: t.Optional[t.Iterable[str]] = None
95) -> str:
96 """Return the host for the given WSGI environment.
98 The ``Host`` header is preferred, then ``SERVER_NAME`` if it's not
99 set. The returned host will only contain the port if it is different
100 than the standard port for the protocol.
102 Optionally, verify that the host is trusted using
103 :func:`host_is_trusted` and raise a
104 :exc:`~werkzeug.exceptions.SecurityError` if it is not.
106 :param environ: A WSGI environment dict.
107 :param trusted_hosts: A list of trusted host names.
109 :return: Host, with port if necessary.
110 :raise ~werkzeug.exceptions.SecurityError: If the host is not
111 trusted.
112 """
113 return _sansio_utils.get_host(
114 environ["wsgi.url_scheme"],
115 environ.get("HTTP_HOST"),
116 _get_server(environ),
117 trusted_hosts,
118 )
121def get_content_length(environ: "WSGIEnvironment") -> t.Optional[int]:
122 """Returns the content length from the WSGI environment as
123 integer. If it's not available or chunked transfer encoding is used,
124 ``None`` is returned.
126 .. versionadded:: 0.9
128 :param environ: the WSGI environ to fetch the content length from.
129 """
130 return _sansio_utils.get_content_length(
131 http_content_length=environ.get("CONTENT_LENGTH"),
132 http_transfer_encoding=environ.get("HTTP_TRANSFER_ENCODING", ""),
133 )
136def get_input_stream(
137 environ: "WSGIEnvironment", safe_fallback: bool = True
138) -> t.IO[bytes]:
139 """Returns the input stream from the WSGI environment and wraps it
140 in the most sensible way possible. The stream returned is not the
141 raw WSGI stream in most cases but one that is safe to read from
142 without taking into account the content length.
144 If content length is not set, the stream will be empty for safety reasons.
145 If the WSGI server supports chunked or infinite streams, it should set
146 the ``wsgi.input_terminated`` value in the WSGI environ to indicate that.
148 .. versionadded:: 0.9
150 :param environ: the WSGI environ to fetch the stream from.
151 :param safe_fallback: use an empty stream as a safe fallback when the
152 content length is not set. Disabling this allows infinite streams,
153 which can be a denial-of-service risk.
154 """
155 stream = t.cast(t.IO[bytes], environ["wsgi.input"])
156 content_length = get_content_length(environ)
158 # A wsgi extension that tells us if the input is terminated. In
159 # that case we return the stream unchanged as we know we can safely
160 # read it until the end.
161 if environ.get("wsgi.input_terminated"):
162 return stream
164 # If the request doesn't specify a content length, returning the stream is
165 # potentially dangerous because it could be infinite, malicious or not. If
166 # safe_fallback is true, return an empty stream instead for safety.
167 if content_length is None:
168 return io.BytesIO() if safe_fallback else stream
170 # Otherwise limit the stream to the content length
171 return t.cast(t.IO[bytes], LimitedStream(stream, content_length))
174def get_query_string(environ: "WSGIEnvironment") -> str:
175 """Returns the ``QUERY_STRING`` from the WSGI environment. This also
176 takes care of the WSGI decoding dance. The string returned will be
177 restricted to ASCII characters.
179 :param environ: WSGI environment to get the query string from.
181 .. deprecated:: 2.2
182 Will be removed in Werkzeug 2.3.
184 .. versionadded:: 0.9
185 """
186 warnings.warn(
187 "'get_query_string' is deprecated and will be removed in Werkzeug 2.3.",
188 DeprecationWarning,
189 stacklevel=2,
190 )
191 qs = environ.get("QUERY_STRING", "").encode("latin1")
192 # QUERY_STRING really should be ascii safe but some browsers
193 # will send us some unicode stuff (I am looking at you IE).
194 # In that case we want to urllib quote it badly.
195 return url_quote(qs, safe=":&%=+$!*'(),")
198def get_path_info(
199 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace"
200) -> str:
201 """Return the ``PATH_INFO`` from the WSGI environment and decode it
202 unless ``charset`` is ``None``.
204 :param environ: WSGI environment to get the path from.
205 :param charset: The charset for the path info, or ``None`` if no
206 decoding should be performed.
207 :param errors: The decoding error handling.
209 .. versionadded:: 0.9
210 """
211 path = environ.get("PATH_INFO", "").encode("latin1")
212 return _to_str(path, charset, errors, allow_none_charset=True) # type: ignore
215def get_script_name(
216 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace"
217) -> str:
218 """Return the ``SCRIPT_NAME`` from the WSGI environment and decode
219 it unless `charset` is set to ``None``.
221 :param environ: WSGI environment to get the path from.
222 :param charset: The charset for the path, or ``None`` if no decoding
223 should be performed.
224 :param errors: The decoding error handling.
226 .. deprecated:: 2.2
227 Will be removed in Werkzeug 2.3.
229 .. versionadded:: 0.9
230 """
231 warnings.warn(
232 "'get_script_name' is deprecated and will be removed in Werkzeug 2.3.",
233 DeprecationWarning,
234 stacklevel=2,
235 )
236 path = environ.get("SCRIPT_NAME", "").encode("latin1")
237 return _to_str(path, charset, errors, allow_none_charset=True) # type: ignore
240def pop_path_info(
241 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace"
242) -> t.Optional[str]:
243 """Removes and returns the next segment of `PATH_INFO`, pushing it onto
244 `SCRIPT_NAME`. Returns `None` if there is nothing left on `PATH_INFO`.
246 If the `charset` is set to `None` bytes are returned.
248 If there are empty segments (``'/foo//bar``) these are ignored but
249 properly pushed to the `SCRIPT_NAME`:
251 >>> env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b'}
252 >>> pop_path_info(env)
253 'a'
254 >>> env['SCRIPT_NAME']
255 '/foo/a'
256 >>> pop_path_info(env)
257 'b'
258 >>> env['SCRIPT_NAME']
259 '/foo/a/b'
261 .. deprecated:: 2.2
262 Will be removed in Werkzeug 2.3.
264 .. versionadded:: 0.5
266 .. versionchanged:: 0.9
267 The path is now decoded and a charset and encoding
268 parameter can be provided.
270 :param environ: the WSGI environment that is modified.
271 :param charset: The ``encoding`` parameter passed to
272 :func:`bytes.decode`.
273 :param errors: The ``errors`` paramater passed to
274 :func:`bytes.decode`.
275 """
276 warnings.warn(
277 "'pop_path_info' is deprecated and will be removed in Werkzeug 2.3.",
278 DeprecationWarning,
279 stacklevel=2,
280 )
282 path = environ.get("PATH_INFO")
283 if not path:
284 return None
286 script_name = environ.get("SCRIPT_NAME", "")
288 # shift multiple leading slashes over
289 old_path = path
290 path = path.lstrip("/")
291 if path != old_path:
292 script_name += "/" * (len(old_path) - len(path))
294 if "/" not in path:
295 environ["PATH_INFO"] = ""
296 environ["SCRIPT_NAME"] = script_name + path
297 rv = path.encode("latin1")
298 else:
299 segment, path = path.split("/", 1)
300 environ["PATH_INFO"] = f"/{path}"
301 environ["SCRIPT_NAME"] = script_name + segment
302 rv = segment.encode("latin1")
304 return _to_str(rv, charset, errors, allow_none_charset=True) # type: ignore
307def peek_path_info(
308 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace"
309) -> t.Optional[str]:
310 """Returns the next segment on the `PATH_INFO` or `None` if there
311 is none. Works like :func:`pop_path_info` without modifying the
312 environment:
314 >>> env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b'}
315 >>> peek_path_info(env)
316 'a'
317 >>> peek_path_info(env)
318 'a'
320 If the `charset` is set to `None` bytes are returned.
322 .. deprecated:: 2.2
323 Will be removed in Werkzeug 2.3.
325 .. versionadded:: 0.5
327 .. versionchanged:: 0.9
328 The path is now decoded and a charset and encoding
329 parameter can be provided.
331 :param environ: the WSGI environment that is checked.
332 """
333 warnings.warn(
334 "'peek_path_info' is deprecated and will be removed in Werkzeug 2.3.",
335 DeprecationWarning,
336 stacklevel=2,
337 )
339 segments = environ.get("PATH_INFO", "").lstrip("/").split("/", 1)
340 if segments:
341 return _to_str( # type: ignore
342 segments[0].encode("latin1"), charset, errors, allow_none_charset=True
343 )
344 return None
347def extract_path_info(
348 environ_or_baseurl: t.Union[str, "WSGIEnvironment"],
349 path_or_url: t.Union[str, _URLTuple],
350 charset: str = "utf-8",
351 errors: str = "werkzeug.url_quote",
352 collapse_http_schemes: bool = True,
353) -> t.Optional[str]:
354 """Extracts the path info from the given URL (or WSGI environment) and
355 path. The path info returned is a string. The URLs might also be IRIs.
357 If the path info could not be determined, `None` is returned.
359 Some examples:
361 >>> extract_path_info('http://example.com/app', '/app/hello')
362 '/hello'
363 >>> extract_path_info('http://example.com/app',
364 ... 'https://example.com/app/hello')
365 '/hello'
366 >>> extract_path_info('http://example.com/app',
367 ... 'https://example.com/app/hello',
368 ... collapse_http_schemes=False) is None
369 True
371 Instead of providing a base URL you can also pass a WSGI environment.
373 :param environ_or_baseurl: a WSGI environment dict, a base URL or
374 base IRI. This is the root of the
375 application.
376 :param path_or_url: an absolute path from the server root, a
377 relative path (in which case it's the path info)
378 or a full URL.
379 :param charset: the charset for byte data in URLs
380 :param errors: the error handling on decode
381 :param collapse_http_schemes: if set to `False` the algorithm does
382 not assume that http and https on the
383 same server point to the same
384 resource.
386 .. deprecated:: 2.2
387 Will be removed in Werkzeug 2.3.
389 .. versionchanged:: 0.15
390 The ``errors`` parameter defaults to leaving invalid bytes
391 quoted instead of replacing them.
393 .. versionadded:: 0.6
395 """
396 warnings.warn(
397 "'extract_path_info' is deprecated and will be removed in Werkzeug 2.3.",
398 DeprecationWarning,
399 stacklevel=2,
400 )
402 def _normalize_netloc(scheme: str, netloc: str) -> str:
403 parts = netloc.split("@", 1)[-1].split(":", 1)
404 port: t.Optional[str]
406 if len(parts) == 2:
407 netloc, port = parts
408 if (scheme == "http" and port == "80") or (
409 scheme == "https" and port == "443"
410 ):
411 port = None
412 else:
413 netloc = parts[0]
414 port = None
416 if port is not None:
417 netloc += f":{port}"
419 return netloc
421 # make sure whatever we are working on is a IRI and parse it
422 path = uri_to_iri(path_or_url, charset, errors)
423 if isinstance(environ_or_baseurl, dict):
424 environ_or_baseurl = get_current_url(environ_or_baseurl, root_only=True)
425 base_iri = uri_to_iri(environ_or_baseurl, charset, errors)
426 base_scheme, base_netloc, base_path = url_parse(base_iri)[:3]
427 cur_scheme, cur_netloc, cur_path = url_parse(url_join(base_iri, path))[:3]
429 # normalize the network location
430 base_netloc = _normalize_netloc(base_scheme, base_netloc)
431 cur_netloc = _normalize_netloc(cur_scheme, cur_netloc)
433 # is that IRI even on a known HTTP scheme?
434 if collapse_http_schemes:
435 for scheme in base_scheme, cur_scheme:
436 if scheme not in ("http", "https"):
437 return None
438 else:
439 if not (base_scheme in ("http", "https") and base_scheme == cur_scheme):
440 return None
442 # are the netlocs compatible?
443 if base_netloc != cur_netloc:
444 return None
446 # are we below the application path?
447 base_path = base_path.rstrip("/")
448 if not cur_path.startswith(base_path):
449 return None
451 return f"/{cur_path[len(base_path) :].lstrip('/')}"
454class ClosingIterator:
455 """The WSGI specification requires that all middlewares and gateways
456 respect the `close` callback of the iterable returned by the application.
457 Because it is useful to add another close action to a returned iterable
458 and adding a custom iterable is a boring task this class can be used for
459 that::
461 return ClosingIterator(app(environ, start_response), [cleanup_session,
462 cleanup_locals])
464 If there is just one close function it can be passed instead of the list.
466 A closing iterator is not needed if the application uses response objects
467 and finishes the processing if the response is started::
469 try:
470 return response(environ, start_response)
471 finally:
472 cleanup_session()
473 cleanup_locals()
474 """
476 def __init__(
477 self,
478 iterable: t.Iterable[bytes],
479 callbacks: t.Optional[
480 t.Union[t.Callable[[], None], t.Iterable[t.Callable[[], None]]]
481 ] = None,
482 ) -> None:
483 iterator = iter(iterable)
484 self._next = t.cast(t.Callable[[], bytes], partial(next, iterator))
485 if callbacks is None:
486 callbacks = []
487 elif callable(callbacks):
488 callbacks = [callbacks]
489 else:
490 callbacks = list(callbacks)
491 iterable_close = getattr(iterable, "close", None)
492 if iterable_close:
493 callbacks.insert(0, iterable_close)
494 self._callbacks = callbacks
496 def __iter__(self) -> "ClosingIterator":
497 return self
499 def __next__(self) -> bytes:
500 return self._next()
502 def close(self) -> None:
503 for callback in self._callbacks:
504 callback()
507def wrap_file(
508 environ: "WSGIEnvironment", file: t.IO[bytes], buffer_size: int = 8192
509) -> t.Iterable[bytes]:
510 """Wraps a file. This uses the WSGI server's file wrapper if available
511 or otherwise the generic :class:`FileWrapper`.
513 .. versionadded:: 0.5
515 If the file wrapper from the WSGI server is used it's important to not
516 iterate over it from inside the application but to pass it through
517 unchanged. If you want to pass out a file wrapper inside a response
518 object you have to set :attr:`Response.direct_passthrough` to `True`.
520 More information about file wrappers are available in :pep:`333`.
522 :param file: a :class:`file`-like object with a :meth:`~file.read` method.
523 :param buffer_size: number of bytes for one iteration.
524 """
525 return environ.get("wsgi.file_wrapper", FileWrapper)( # type: ignore
526 file, buffer_size
527 )
530class FileWrapper:
531 """This class can be used to convert a :class:`file`-like object into
532 an iterable. It yields `buffer_size` blocks until the file is fully
533 read.
535 You should not use this class directly but rather use the
536 :func:`wrap_file` function that uses the WSGI server's file wrapper
537 support if it's available.
539 .. versionadded:: 0.5
541 If you're using this object together with a :class:`Response` you have
542 to use the `direct_passthrough` mode.
544 :param file: a :class:`file`-like object with a :meth:`~file.read` method.
545 :param buffer_size: number of bytes for one iteration.
546 """
548 def __init__(self, file: t.IO[bytes], buffer_size: int = 8192) -> None:
549 self.file = file
550 self.buffer_size = buffer_size
552 def close(self) -> None:
553 if hasattr(self.file, "close"):
554 self.file.close()
556 def seekable(self) -> bool:
557 if hasattr(self.file, "seekable"):
558 return self.file.seekable()
559 if hasattr(self.file, "seek"):
560 return True
561 return False
563 def seek(self, *args: t.Any) -> None:
564 if hasattr(self.file, "seek"):
565 self.file.seek(*args)
567 def tell(self) -> t.Optional[int]:
568 if hasattr(self.file, "tell"):
569 return self.file.tell()
570 return None
572 def __iter__(self) -> "FileWrapper":
573 return self
575 def __next__(self) -> bytes:
576 data = self.file.read(self.buffer_size)
577 if data:
578 return data
579 raise StopIteration()
582class _RangeWrapper:
583 # private for now, but should we make it public in the future ?
585 """This class can be used to convert an iterable object into
586 an iterable that will only yield a piece of the underlying content.
587 It yields blocks until the underlying stream range is fully read.
588 The yielded blocks will have a size that can't exceed the original
589 iterator defined block size, but that can be smaller.
591 If you're using this object together with a :class:`Response` you have
592 to use the `direct_passthrough` mode.
594 :param iterable: an iterable object with a :meth:`__next__` method.
595 :param start_byte: byte from which read will start.
596 :param byte_range: how many bytes to read.
597 """
599 def __init__(
600 self,
601 iterable: t.Union[t.Iterable[bytes], t.IO[bytes]],
602 start_byte: int = 0,
603 byte_range: t.Optional[int] = None,
604 ):
605 self.iterable = iter(iterable)
606 self.byte_range = byte_range
607 self.start_byte = start_byte
608 self.end_byte = None
610 if byte_range is not None:
611 self.end_byte = start_byte + byte_range
613 self.read_length = 0
614 self.seekable = (
615 hasattr(iterable, "seekable") and iterable.seekable() # type: ignore
616 )
617 self.end_reached = False
619 def __iter__(self) -> "_RangeWrapper":
620 return self
622 def _next_chunk(self) -> bytes:
623 try:
624 chunk = next(self.iterable)
625 self.read_length += len(chunk)
626 return chunk
627 except StopIteration:
628 self.end_reached = True
629 raise
631 def _first_iteration(self) -> t.Tuple[t.Optional[bytes], int]:
632 chunk = None
633 if self.seekable:
634 self.iterable.seek(self.start_byte) # type: ignore
635 self.read_length = self.iterable.tell() # type: ignore
636 contextual_read_length = self.read_length
637 else:
638 while self.read_length <= self.start_byte:
639 chunk = self._next_chunk()
640 if chunk is not None:
641 chunk = chunk[self.start_byte - self.read_length :]
642 contextual_read_length = self.start_byte
643 return chunk, contextual_read_length
645 def _next(self) -> bytes:
646 if self.end_reached:
647 raise StopIteration()
648 chunk = None
649 contextual_read_length = self.read_length
650 if self.read_length == 0:
651 chunk, contextual_read_length = self._first_iteration()
652 if chunk is None:
653 chunk = self._next_chunk()
654 if self.end_byte is not None and self.read_length >= self.end_byte:
655 self.end_reached = True
656 return chunk[: self.end_byte - contextual_read_length]
657 return chunk
659 def __next__(self) -> bytes:
660 chunk = self._next()
661 if chunk:
662 return chunk
663 self.end_reached = True
664 raise StopIteration()
666 def close(self) -> None:
667 if hasattr(self.iterable, "close"):
668 self.iterable.close() # type: ignore
671def _make_chunk_iter(
672 stream: t.Union[t.Iterable[bytes], t.IO[bytes]],
673 limit: t.Optional[int],
674 buffer_size: int,
675) -> t.Iterator[bytes]:
676 """Helper for the line and chunk iter functions."""
677 if isinstance(stream, (bytes, bytearray, str)):
678 raise TypeError(
679 "Passed a string or byte object instead of true iterator or stream."
680 )
681 if not hasattr(stream, "read"):
682 for item in stream:
683 if item:
684 yield item
685 return
686 stream = t.cast(t.IO[bytes], stream)
687 if not isinstance(stream, LimitedStream) and limit is not None:
688 stream = t.cast(t.IO[bytes], LimitedStream(stream, limit))
689 _read = stream.read
690 while True:
691 item = _read(buffer_size)
692 if not item:
693 break
694 yield item
697def make_line_iter(
698 stream: t.Union[t.Iterable[bytes], t.IO[bytes]],
699 limit: t.Optional[int] = None,
700 buffer_size: int = 10 * 1024,
701 cap_at_buffer: bool = False,
702) -> t.Iterator[bytes]:
703 """Safely iterates line-based over an input stream. If the input stream
704 is not a :class:`LimitedStream` the `limit` parameter is mandatory.
706 This uses the stream's :meth:`~file.read` method internally as opposite
707 to the :meth:`~file.readline` method that is unsafe and can only be used
708 in violation of the WSGI specification. The same problem applies to the
709 `__iter__` function of the input stream which calls :meth:`~file.readline`
710 without arguments.
712 If you need line-by-line processing it's strongly recommended to iterate
713 over the input stream using this helper function.
715 .. versionchanged:: 0.8
716 This function now ensures that the limit was reached.
718 .. versionadded:: 0.9
719 added support for iterators as input stream.
721 .. versionadded:: 0.11.10
722 added support for the `cap_at_buffer` parameter.
724 :param stream: the stream or iterate to iterate over.
725 :param limit: the limit in bytes for the stream. (Usually
726 content length. Not necessary if the `stream`
727 is a :class:`LimitedStream`.
728 :param buffer_size: The optional buffer size.
729 :param cap_at_buffer: if this is set chunks are split if they are longer
730 than the buffer size. Internally this is implemented
731 that the buffer size might be exhausted by a factor
732 of two however.
733 """
734 _iter = _make_chunk_iter(stream, limit, buffer_size)
736 first_item = next(_iter, "")
737 if not first_item:
738 return
740 s = _make_encode_wrapper(first_item)
741 empty = t.cast(bytes, s(""))
742 cr = t.cast(bytes, s("\r"))
743 lf = t.cast(bytes, s("\n"))
744 crlf = t.cast(bytes, s("\r\n"))
746 _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter))
748 def _iter_basic_lines() -> t.Iterator[bytes]:
749 _join = empty.join
750 buffer: t.List[bytes] = []
751 while True:
752 new_data = next(_iter, "")
753 if not new_data:
754 break
755 new_buf: t.List[bytes] = []
756 buf_size = 0
757 for item in t.cast(
758 t.Iterator[bytes], chain(buffer, new_data.splitlines(True))
759 ):
760 new_buf.append(item)
761 buf_size += len(item)
762 if item and item[-1:] in crlf:
763 yield _join(new_buf)
764 new_buf = []
765 elif cap_at_buffer and buf_size >= buffer_size:
766 rv = _join(new_buf)
767 while len(rv) >= buffer_size:
768 yield rv[:buffer_size]
769 rv = rv[buffer_size:]
770 new_buf = [rv]
771 buffer = new_buf
772 if buffer:
773 yield _join(buffer)
775 # This hackery is necessary to merge 'foo\r' and '\n' into one item
776 # of 'foo\r\n' if we were unlucky and we hit a chunk boundary.
777 previous = empty
778 for item in _iter_basic_lines():
779 if item == lf and previous[-1:] == cr:
780 previous += item
781 item = empty
782 if previous:
783 yield previous
784 previous = item
785 if previous:
786 yield previous
789def make_chunk_iter(
790 stream: t.Union[t.Iterable[bytes], t.IO[bytes]],
791 separator: bytes,
792 limit: t.Optional[int] = None,
793 buffer_size: int = 10 * 1024,
794 cap_at_buffer: bool = False,
795) -> t.Iterator[bytes]:
796 """Works like :func:`make_line_iter` but accepts a separator
797 which divides chunks. If you want newline based processing
798 you should use :func:`make_line_iter` instead as it
799 supports arbitrary newline markers.
801 .. versionadded:: 0.8
803 .. versionadded:: 0.9
804 added support for iterators as input stream.
806 .. versionadded:: 0.11.10
807 added support for the `cap_at_buffer` parameter.
809 :param stream: the stream or iterate to iterate over.
810 :param separator: the separator that divides chunks.
811 :param limit: the limit in bytes for the stream. (Usually
812 content length. Not necessary if the `stream`
813 is otherwise already limited).
814 :param buffer_size: The optional buffer size.
815 :param cap_at_buffer: if this is set chunks are split if they are longer
816 than the buffer size. Internally this is implemented
817 that the buffer size might be exhausted by a factor
818 of two however.
819 """
820 _iter = _make_chunk_iter(stream, limit, buffer_size)
822 first_item = next(_iter, b"")
823 if not first_item:
824 return
826 _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter))
827 if isinstance(first_item, str):
828 separator = _to_str(separator)
829 _split = re.compile(f"({re.escape(separator)})").split
830 _join = "".join
831 else:
832 separator = _to_bytes(separator)
833 _split = re.compile(b"(" + re.escape(separator) + b")").split
834 _join = b"".join
836 buffer: t.List[bytes] = []
837 while True:
838 new_data = next(_iter, b"")
839 if not new_data:
840 break
841 chunks = _split(new_data)
842 new_buf: t.List[bytes] = []
843 buf_size = 0
844 for item in chain(buffer, chunks):
845 if item == separator:
846 yield _join(new_buf)
847 new_buf = []
848 buf_size = 0
849 else:
850 buf_size += len(item)
851 new_buf.append(item)
853 if cap_at_buffer and buf_size >= buffer_size:
854 rv = _join(new_buf)
855 while len(rv) >= buffer_size:
856 yield rv[:buffer_size]
857 rv = rv[buffer_size:]
858 new_buf = [rv]
859 buf_size = len(rv)
861 buffer = new_buf
862 if buffer:
863 yield _join(buffer)
866class LimitedStream(io.IOBase):
867 """Wraps a stream so that it doesn't read more than n bytes. If the
868 stream is exhausted and the caller tries to get more bytes from it
869 :func:`on_exhausted` is called which by default returns an empty
870 string. The return value of that function is forwarded
871 to the reader function. So if it returns an empty string
872 :meth:`read` will return an empty string as well.
874 The limit however must never be higher than what the stream can
875 output. Otherwise :meth:`readlines` will try to read past the
876 limit.
878 .. admonition:: Note on WSGI compliance
880 calls to :meth:`readline` and :meth:`readlines` are not
881 WSGI compliant because it passes a size argument to the
882 readline methods. Unfortunately the WSGI PEP is not safely
883 implementable without a size argument to :meth:`readline`
884 because there is no EOF marker in the stream. As a result
885 of that the use of :meth:`readline` is discouraged.
887 For the same reason iterating over the :class:`LimitedStream`
888 is not portable. It internally calls :meth:`readline`.
890 We strongly suggest using :meth:`read` only or using the
891 :func:`make_line_iter` which safely iterates line-based
892 over a WSGI input stream.
894 :param stream: the stream to wrap.
895 :param limit: the limit for the stream, must not be longer than
896 what the string can provide if the stream does not
897 end with `EOF` (like `wsgi.input`)
898 """
900 def __init__(self, stream: t.IO[bytes], limit: int) -> None:
901 self._read = stream.read
902 self._readline = stream.readline
903 self._pos = 0
904 self.limit = limit
906 def __iter__(self) -> "LimitedStream":
907 return self
909 @property
910 def is_exhausted(self) -> bool:
911 """If the stream is exhausted this attribute is `True`."""
912 return self._pos >= self.limit
914 def on_exhausted(self) -> bytes:
915 """This is called when the stream tries to read past the limit.
916 The return value of this function is returned from the reading
917 function.
918 """
919 # Read null bytes from the stream so that we get the
920 # correct end of stream marker.
921 return self._read(0)
923 def on_disconnect(self) -> bytes:
924 """What should happen if a disconnect is detected? The return
925 value of this function is returned from read functions in case
926 the client went away. By default a
927 :exc:`~werkzeug.exceptions.ClientDisconnected` exception is raised.
928 """
929 from .exceptions import ClientDisconnected
931 raise ClientDisconnected()
933 def exhaust(self, chunk_size: int = 1024 * 64) -> None:
934 """Exhaust the stream. This consumes all the data left until the
935 limit is reached.
937 :param chunk_size: the size for a chunk. It will read the chunk
938 until the stream is exhausted and throw away
939 the results.
940 """
941 to_read = self.limit - self._pos
942 chunk = chunk_size
943 while to_read > 0:
944 chunk = min(to_read, chunk)
945 self.read(chunk)
946 to_read -= chunk
948 def read(self, size: t.Optional[int] = None) -> bytes:
949 """Read `size` bytes or if size is not provided everything is read.
951 :param size: the number of bytes read.
952 """
953 if self._pos >= self.limit:
954 return self.on_exhausted()
955 if size is None or size == -1: # -1 is for consistence with file
956 size = self.limit
957 to_read = min(self.limit - self._pos, size)
958 try:
959 read = self._read(to_read)
960 except (OSError, ValueError):
961 return self.on_disconnect()
962 if to_read and len(read) != to_read:
963 return self.on_disconnect()
964 self._pos += len(read)
965 return read
967 def readline(self, size: t.Optional[int] = None) -> bytes:
968 """Reads one line from the stream."""
969 if self._pos >= self.limit:
970 return self.on_exhausted()
971 if size is None:
972 size = self.limit - self._pos
973 else:
974 size = min(size, self.limit - self._pos)
975 try:
976 line = self._readline(size)
977 except (ValueError, OSError):
978 return self.on_disconnect()
979 if size and not line:
980 return self.on_disconnect()
981 self._pos += len(line)
982 return line
984 def readlines(self, size: t.Optional[int] = None) -> t.List[bytes]:
985 """Reads a file into a list of strings. It calls :meth:`readline`
986 until the file is read to the end. It does support the optional
987 `size` argument if the underlying stream supports it for
988 `readline`.
989 """
990 last_pos = self._pos
991 result = []
992 if size is not None:
993 end = min(self.limit, last_pos + size)
994 else:
995 end = self.limit
996 while True:
997 if size is not None:
998 size -= last_pos - self._pos
999 if self._pos >= end:
1000 break
1001 result.append(self.readline(size))
1002 if size is not None:
1003 last_pos = self._pos
1004 return result
1006 def tell(self) -> int:
1007 """Returns the position of the stream.
1009 .. versionadded:: 0.9
1010 """
1011 return self._pos
1013 def __next__(self) -> bytes:
1014 line = self.readline()
1015 if not line:
1016 raise StopIteration()
1017 return line
1019 def readable(self) -> bool:
1020 return True