Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/wsgi.py: 22%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import io
4import typing as t
5from functools import partial
6from functools import update_wrapper
8from .exceptions import ClientDisconnected
9from .exceptions import RequestEntityTooLarge
10from .sansio import utils as _sansio_utils
11from .sansio.utils import host_is_trusted # noqa: F401 # Imported as part of API
13if t.TYPE_CHECKING:
14 from _typeshed.wsgi import WSGIApplication
15 from _typeshed.wsgi import WSGIEnvironment
18def responder(f: t.Callable[..., WSGIApplication]) -> WSGIApplication:
19 """Marks a function as responder. Decorate a function with it and it
20 will automatically call the return value as WSGI application.
22 Example::
24 @responder
25 def application(environ, start_response):
26 return Response('Hello World!')
27 """
28 return update_wrapper(lambda *a: f(*a)(*a[-2:]), f)
31def get_current_url(
32 environ: WSGIEnvironment,
33 root_only: bool = False,
34 strip_querystring: bool = False,
35 host_only: bool = False,
36 trusted_hosts: t.Collection[str] | None = None,
37) -> str:
38 """Recreate the URL for a request from the parts in a WSGI
39 environment.
41 The URL is an IRI, not a URI, so it may contain Unicode characters.
42 Use :func:`~werkzeug.urls.iri_to_uri` to convert it to ASCII.
44 :param environ: The WSGI environment to get the URL parts from.
45 :param root_only: Only build the root path, don't include the
46 remaining path or query string.
47 :param strip_querystring: Don't include the query string.
48 :param host_only: Only build the scheme and host.
49 :param trusted_hosts: A list of trusted host names to validate the
50 host against.
51 """
52 parts = {
53 "scheme": environ["wsgi.url_scheme"],
54 "host": get_host(environ, trusted_hosts),
55 }
57 if not host_only:
58 parts["root_path"] = environ.get("SCRIPT_NAME", "")
60 if not root_only:
61 parts["path"] = environ.get("PATH_INFO", "")
63 if not strip_querystring:
64 parts["query_string"] = environ.get("QUERY_STRING", "").encode("latin1")
66 return _sansio_utils.get_current_url(**parts)
69def _get_server(
70 environ: WSGIEnvironment,
71) -> tuple[str, int | None] | None:
72 name = environ.get("SERVER_NAME")
74 if name is None:
75 return None
77 try:
78 port: int | None = int(environ.get("SERVER_PORT", None)) # type: ignore[arg-type]
79 except (TypeError, ValueError):
80 # unix socket
81 port = None
83 return name, port
86def get_host(
87 environ: WSGIEnvironment, trusted_hosts: t.Collection[str] | None = None
88) -> str:
89 """Get and validate a request's ``host:port`` based on the values in the
90 given WSGI environ.
92 The ``Host`` header sent by the client is preferred. Otherwise, the server's
93 configured address is used. If the server address is a Unix socket, it is
94 ignored. The port is omitted if it matches the standard HTTP or HTTPS ports.
96 The value is passed through :func:`host_is_trusted`. The host must be made
97 up of valid characters, but this does not check validity beyond that. If a
98 list of trusted domains is given, the domain must match one.
100 :param environ: The WSGI environ.
101 :param trusted_hosts: A list of trusted domains to match. These should
102 already be IDNA encoded, but will be encoded if needed. The port is
103 ignored for this check. If a name starts with a dot it will match as a
104 suffix, accepting all subdomains. If empty or ``None``, all domains are
105 allowed.
107 :return: Host, with port if necessary.
108 :raise .SecurityError: If the host is not trusted.
110 .. versionchanged:: 3.2
111 The characters of the host value are validated. The empty string is no
112 longer allowed if no header value is available.
114 .. versionchanged:: 3.2
115 When using the server address, Unix sockets are ignored.
117 .. versionchanged:: 3.1.3
118 If ``SERVER_NAME`` is IPv6, it is wrapped in ``[]``.
119 """
120 return _sansio_utils.get_host(
121 environ["wsgi.url_scheme"],
122 environ.get("HTTP_HOST"),
123 _get_server(environ),
124 trusted_hosts,
125 )
128def get_content_length(environ: WSGIEnvironment) -> int | None:
129 """Return the ``Content-Length`` header value as an int. If the header is not given
130 or the ``Transfer-Encoding`` header is ``chunked``, ``None`` is returned to indicate
131 a streaming request. If the value is not an integer, or negative, 0 is returned.
133 :param environ: The WSGI environ to get the content length from.
135 .. versionadded:: 0.9
136 """
137 return _sansio_utils.get_content_length(
138 http_content_length=environ.get("CONTENT_LENGTH"),
139 http_transfer_encoding=environ.get("HTTP_TRANSFER_ENCODING"),
140 )
143def get_input_stream(
144 environ: WSGIEnvironment,
145 safe_fallback: bool = True,
146 max_content_length: int | None = None,
147) -> t.IO[bytes]:
148 """Return the WSGI input stream, wrapped so that it may be read safely without going
149 past the ``Content-Length`` header value or ``max_content_length``.
151 If ``Content-Length`` exceeds ``max_content_length``, a
152 :exc:`RequestEntityTooLarge`` ``413 Content Too Large`` error is raised.
154 If the WSGI server sets ``environ["wsgi.input_terminated"]``, it indicates that the
155 server handles terminating the stream, so it is safe to read directly. For example,
156 a server that knows how to handle chunked requests safely would set this.
158 If ``max_content_length`` is set, it can be enforced on streams if
159 ``wsgi.input_terminated`` is set. Otherwise, an empty stream is returned unless the
160 user explicitly disables this safe fallback.
162 If the limit is reached before the underlying stream is exhausted (such as a file
163 that is too large, or an infinite stream), the remaining contents of the stream
164 cannot be read safely. Depending on how the server handles this, clients may show a
165 "connection reset" failure instead of seeing the 413 response.
167 :param environ: The WSGI environ containing the stream.
168 :param safe_fallback: Return an empty stream when ``Content-Length`` is not set.
169 Disabling this allows infinite streams, which can be a denial-of-service risk.
170 :param max_content_length: The maximum length that content-length or streaming
171 requests may not exceed.
173 .. versionchanged:: 2.3.2
174 ``max_content_length`` is only applied to streaming requests if the server sets
175 ``wsgi.input_terminated``.
177 .. versionchanged:: 2.3
178 Check ``max_content_length`` and raise an error if it is exceeded.
180 .. versionadded:: 0.9
181 """
182 stream = t.cast(t.IO[bytes], environ["wsgi.input"])
183 content_length = get_content_length(environ)
185 if content_length is not None and max_content_length is not None:
186 if content_length > max_content_length:
187 raise RequestEntityTooLarge()
189 # A WSGI server can set this to indicate that it terminates the input stream. In
190 # that case the stream is safe without wrapping, or can enforce a max length.
191 if "wsgi.input_terminated" in environ:
192 if max_content_length is not None:
193 # If this is moved above, it can cause the stream to hang if a read attempt
194 # is made when the client sends no data. For example, the development server
195 # does not handle buffering except for chunked encoding.
196 return t.cast(
197 t.IO[bytes], LimitedStream(stream, max_content_length, is_max=True)
198 )
200 return stream
202 # No limit given, return an empty stream unless the user explicitly allows the
203 # potentially infinite stream. An infinite stream is dangerous if it's not expected,
204 # as it can tie up a worker indefinitely.
205 if content_length is None:
206 return io.BytesIO() if safe_fallback else stream
208 return t.cast(t.IO[bytes], LimitedStream(stream, content_length))
211def get_path_info(environ: WSGIEnvironment) -> str:
212 """Return ``PATH_INFO`` from the WSGI environment.
214 :param environ: WSGI environment to get the path from.
216 .. versionchanged:: 3.0
217 The ``charset`` and ``errors`` parameters were removed.
219 .. versionadded:: 0.9
220 """
221 path: bytes = environ.get("PATH_INFO", "").encode("latin1")
222 return path.decode(errors="replace")
225class ClosingIterator:
226 """The WSGI specification requires that all middlewares and gateways
227 respect the `close` callback of the iterable returned by the application.
228 Because it is useful to add another close action to a returned iterable
229 and adding a custom iterable is a boring task this class can be used for
230 that::
232 return ClosingIterator(app(environ, start_response), [cleanup_session,
233 cleanup_locals])
235 If there is just one close function it can be passed instead of the list.
237 A closing iterator is not needed if the application uses response objects
238 and finishes the processing if the response is started::
240 try:
241 return response(environ, start_response)
242 finally:
243 cleanup_session()
244 cleanup_locals()
245 """
247 def __init__(
248 self,
249 iterable: t.Iterable[bytes],
250 callbacks: None
251 | (t.Callable[[], None] | t.Iterable[t.Callable[[], None]]) = None,
252 ) -> None:
253 iterator = iter(iterable)
254 self._next = t.cast(t.Callable[[], bytes], partial(next, iterator))
255 if callbacks is None:
256 callbacks = []
257 elif callable(callbacks):
258 callbacks = [callbacks]
259 else:
260 callbacks = list(callbacks)
261 iterable_close = getattr(iterable, "close", None)
262 if iterable_close:
263 callbacks.insert(0, iterable_close)
264 self._callbacks = callbacks
266 def __iter__(self) -> ClosingIterator:
267 return self
269 def __next__(self) -> bytes:
270 return self._next()
272 def close(self) -> None:
273 for callback in self._callbacks:
274 callback()
277def wrap_file(
278 environ: WSGIEnvironment, file: t.IO[bytes], buffer_size: int = 8192
279) -> t.Iterable[bytes]:
280 """Wraps a file. This uses the WSGI server's file wrapper if available
281 or otherwise the generic :class:`FileWrapper`.
283 .. versionadded:: 0.5
285 If the file wrapper from the WSGI server is used it's important to not
286 iterate over it from inside the application but to pass it through
287 unchanged. If you want to pass out a file wrapper inside a response
288 object you have to set :attr:`Response.direct_passthrough` to `True`.
290 More information about file wrappers are available in :pep:`333`.
292 :param file: a :class:`file`-like object with a :meth:`~file.read` method.
293 :param buffer_size: number of bytes for one iteration.
294 """
295 return environ.get("wsgi.file_wrapper", FileWrapper)( # type: ignore
296 file, buffer_size
297 )
300class FileWrapper:
301 """This class can be used to convert a :class:`file`-like object into
302 an iterable. It yields `buffer_size` blocks until the file is fully
303 read.
305 You should not use this class directly but rather use the
306 :func:`wrap_file` function that uses the WSGI server's file wrapper
307 support if it's available.
309 .. versionadded:: 0.5
311 If you're using this object together with a :class:`Response` you have
312 to use the `direct_passthrough` mode.
314 :param file: a :class:`file`-like object with a :meth:`~file.read` method.
315 :param buffer_size: number of bytes for one iteration.
316 """
318 def __init__(self, file: t.IO[bytes], buffer_size: int = 8192) -> None:
319 self.file = file
320 self.buffer_size = buffer_size
322 def close(self) -> None:
323 if hasattr(self.file, "close"):
324 self.file.close()
326 def seekable(self) -> bool:
327 if hasattr(self.file, "seekable"):
328 return self.file.seekable()
329 if hasattr(self.file, "seek"):
330 return True
331 return False
333 def seek(self, *args: t.Any) -> None:
334 if hasattr(self.file, "seek"):
335 self.file.seek(*args)
337 def tell(self) -> int | None:
338 if hasattr(self.file, "tell"):
339 return self.file.tell()
340 return None
342 def __iter__(self) -> FileWrapper:
343 return self
345 def __next__(self) -> bytes:
346 data = self.file.read(self.buffer_size)
347 if data:
348 return data
349 raise StopIteration()
352class _RangeWrapper:
353 # private for now, but should we make it public in the future ?
355 """This class can be used to convert an iterable object into
356 an iterable that will only yield a piece of the underlying content.
357 It yields blocks until the underlying stream range is fully read.
358 The yielded blocks will have a size that can't exceed the original
359 iterator defined block size, but that can be smaller.
361 If you're using this object together with a :class:`Response` you have
362 to use the `direct_passthrough` mode.
364 :param iterable: an iterable object with a :meth:`__next__` method.
365 :param start_byte: byte from which read will start.
366 :param byte_range: how many bytes to read.
367 """
369 def __init__(
370 self,
371 iterable: t.Iterable[bytes] | t.IO[bytes],
372 start_byte: int = 0,
373 byte_range: int | None = None,
374 ):
375 self.iterable = iter(iterable)
376 self.byte_range = byte_range
377 self.start_byte = start_byte
378 self.end_byte = None
380 if byte_range is not None:
381 self.end_byte = start_byte + byte_range
383 self.read_length = 0
384 self.seekable = hasattr(iterable, "seekable") and iterable.seekable()
385 self.end_reached = False
387 def __iter__(self) -> _RangeWrapper:
388 return self
390 def _next_chunk(self) -> bytes:
391 try:
392 chunk = next(self.iterable)
393 self.read_length += len(chunk)
394 return chunk
395 except StopIteration:
396 self.end_reached = True
397 raise
399 def _first_iteration(self) -> tuple[bytes | None, int]:
400 chunk = None
401 if self.seekable:
402 self.iterable.seek(self.start_byte) # type: ignore
403 self.read_length = self.iterable.tell() # type: ignore
404 contextual_read_length = self.read_length
405 else:
406 while self.read_length <= self.start_byte:
407 chunk = self._next_chunk()
408 if chunk is not None:
409 chunk = chunk[self.start_byte - self.read_length :]
410 contextual_read_length = self.start_byte
411 return chunk, contextual_read_length
413 def _next(self) -> bytes:
414 if self.end_reached:
415 raise StopIteration()
416 chunk = None
417 contextual_read_length = self.read_length
418 if self.read_length == 0:
419 chunk, contextual_read_length = self._first_iteration()
420 if chunk is None:
421 chunk = self._next_chunk()
422 if self.end_byte is not None and self.read_length >= self.end_byte:
423 self.end_reached = True
424 return chunk[: self.end_byte - contextual_read_length]
425 return chunk
427 def __next__(self) -> bytes:
428 chunk = self._next()
429 if chunk:
430 return chunk
431 self.end_reached = True
432 raise StopIteration()
434 def close(self) -> None:
435 if hasattr(self.iterable, "close"):
436 self.iterable.close()
439class LimitedStream(io.RawIOBase):
440 """Wrap a stream so that it doesn't read more than a given limit. This is used to
441 limit ``wsgi.input`` to the ``Content-Length`` header value or
442 :attr:`.Request.max_content_length`.
444 When attempting to read after the limit has been reached, :meth:`on_exhausted` is
445 called. When the limit is a maximum, this raises :exc:`.RequestEntityTooLarge`.
447 If reading from the stream returns zero bytes or raises an error,
448 :meth:`on_disconnect` is called, which raises :exc:`.ClientDisconnected`. When the
449 limit is a maximum and zero bytes were read, no error is raised, since it may be the
450 end of the stream.
452 If the limit is reached before the underlying stream is exhausted (such as a file
453 that is too large, or an infinite stream), the remaining contents of the stream
454 cannot be read safely. Depending on how the server handles this, clients may show a
455 "connection reset" failure instead of seeing the 413 response.
457 :param stream: The stream to read from. Must be a readable binary IO object.
458 :param limit: The limit in bytes to not read past. Should be either the
459 ``Content-Length`` header value or ``request.max_content_length``.
460 :param is_max: Whether the given ``limit`` is ``request.max_content_length`` instead
461 of the ``Content-Length`` header value. This changes how exhausted and
462 disconnect events are handled.
464 .. versionchanged:: 2.3
465 Handle ``max_content_length`` differently than ``Content-Length``.
467 .. versionchanged:: 2.3
468 Implements ``io.RawIOBase`` rather than ``io.IOBase``.
469 """
471 def __init__(self, stream: t.IO[bytes], limit: int, is_max: bool = False) -> None:
472 self._stream = stream
473 self._pos = 0
474 self.limit = limit
475 self._limit_is_max = is_max
477 @property
478 def is_exhausted(self) -> bool:
479 """Whether the current stream position has reached the limit."""
480 return self._pos >= self.limit
482 def on_exhausted(self) -> None:
483 """Called when attempting to read after the limit has been reached.
485 The default behavior is to do nothing, unless the limit is a maximum, in which
486 case it raises :exc:`.RequestEntityTooLarge`.
488 .. versionchanged:: 2.3
489 Raises ``RequestEntityTooLarge`` if the limit is a maximum.
491 .. versionchanged:: 2.3
492 Any return value is ignored.
493 """
494 if self._limit_is_max:
495 raise RequestEntityTooLarge()
497 def on_disconnect(self, error: Exception | None = None) -> None:
498 """Called when an attempted read receives zero bytes before the limit was
499 reached. This indicates that the client disconnected before sending the full
500 request body.
502 The default behavior is to raise :exc:`.ClientDisconnected`, unless the limit is
503 a maximum and no error was raised.
505 .. versionchanged:: 2.3
506 Added the ``error`` parameter. Do nothing if the limit is a maximum and no
507 error was raised.
509 .. versionchanged:: 2.3
510 Any return value is ignored.
511 """
512 if not self._limit_is_max or error is not None:
513 raise ClientDisconnected()
515 # If the limit is a maximum, then we may have read zero bytes because the
516 # streaming body is complete. There's no way to distinguish that from the
517 # client disconnecting early.
519 def exhaust(self) -> bytes:
520 """Exhaust the stream by reading until the limit is reached or the client
521 disconnects, returning the remaining data.
523 .. versionchanged:: 2.3
524 Return the remaining data.
526 .. versionchanged:: 2.2.3
527 Handle case where wrapped stream returns fewer bytes than requested.
528 """
529 if not self.is_exhausted:
530 return self.readall()
532 return b""
534 def readinto(self, b: bytearray) -> int | None: # type: ignore[override]
535 size = len(b)
536 remaining = self.limit - self._pos
538 if remaining <= 0:
539 self.on_exhausted()
540 return 0
542 if hasattr(self._stream, "readinto"):
543 # Use stream.readinto if it's available.
544 if size <= remaining:
545 # The size fits in the remaining limit, use the buffer directly.
546 try:
547 out_size: int | None = self._stream.readinto(b)
548 except (OSError, ValueError) as e:
549 self.on_disconnect(error=e)
550 return 0
551 else:
552 # Use a temp buffer with the remaining limit as the size.
553 temp_b = bytearray(remaining)
555 try:
556 out_size = self._stream.readinto(temp_b)
557 except (OSError, ValueError) as e:
558 self.on_disconnect(error=e)
559 return 0
561 if out_size:
562 b[:out_size] = temp_b
563 else:
564 # WSGI requires that stream.read is available.
565 try:
566 data = self._stream.read(min(size, remaining))
567 except (OSError, ValueError) as e:
568 self.on_disconnect(error=e)
569 return 0
571 out_size = len(data)
572 b[:out_size] = data
574 if not out_size:
575 # Read zero bytes from the stream.
576 self.on_disconnect()
577 return 0
579 self._pos += out_size
580 return out_size
582 def readall(self) -> bytes:
583 if self.is_exhausted:
584 self.on_exhausted()
585 return b""
587 out = bytearray()
589 # The parent implementation uses "while True", which results in an extra read.
590 while not self.is_exhausted:
591 data = self.read(1024 * 64)
593 # Stream may return empty before a max limit is reached.
594 if not data:
595 break
597 out.extend(data)
599 return bytes(out)
601 def tell(self) -> int:
602 """Return the current stream position.
604 .. versionadded:: 0.9
605 """
606 return self._pos
608 def readable(self) -> bool:
609 return True