Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/test.py: 56%
602 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 07:17 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 07:17 +0000
1from __future__ import annotations
3import dataclasses
4import mimetypes
5import sys
6import typing as t
7from collections import defaultdict
8from datetime import datetime
9from io import BytesIO
10from itertools import chain
11from random import random
12from tempfile import TemporaryFile
13from time import time
14from urllib.parse import unquote
15from urllib.parse import urlsplit
16from urllib.parse import urlunsplit
18from ._internal import _get_environ
19from ._internal import _wsgi_decoding_dance
20from ._internal import _wsgi_encoding_dance
21from .datastructures import Authorization
22from .datastructures import CallbackDict
23from .datastructures import CombinedMultiDict
24from .datastructures import EnvironHeaders
25from .datastructures import FileMultiDict
26from .datastructures import Headers
27from .datastructures import MultiDict
28from .http import dump_cookie
29from .http import dump_options_header
30from .http import parse_cookie
31from .http import parse_date
32from .http import parse_options_header
33from .sansio.multipart import Data
34from .sansio.multipart import Epilogue
35from .sansio.multipart import Field
36from .sansio.multipart import File
37from .sansio.multipart import MultipartEncoder
38from .sansio.multipart import Preamble
39from .urls import _urlencode
40from .urls import iri_to_uri
41from .utils import cached_property
42from .utils import get_content_type
43from .wrappers.request import Request
44from .wrappers.response import Response
45from .wsgi import ClosingIterator
46from .wsgi import get_current_url
48if t.TYPE_CHECKING:
49 from _typeshed.wsgi import WSGIApplication
50 from _typeshed.wsgi import WSGIEnvironment
51 import typing_extensions as te
54def stream_encode_multipart(
55 data: t.Mapping[str, t.Any],
56 use_tempfile: bool = True,
57 threshold: int = 1024 * 500,
58 boundary: str | None = None,
59) -> tuple[t.IO[bytes], int, str]:
60 """Encode a dict of values (either strings or file descriptors or
61 :class:`FileStorage` objects.) into a multipart encoded string stored
62 in a file descriptor.
64 .. versionchanged:: 3.0
65 The ``charset`` parameter was removed.
66 """
67 if boundary is None:
68 boundary = f"---------------WerkzeugFormPart_{time()}{random()}"
70 stream: t.IO[bytes] = BytesIO()
71 total_length = 0
72 on_disk = False
73 write_binary: t.Callable[[bytes], int]
75 if use_tempfile:
77 def write_binary(s: bytes) -> int:
78 nonlocal stream, total_length, on_disk
80 if on_disk:
81 return stream.write(s)
82 else:
83 length = len(s)
85 if length + total_length <= threshold:
86 stream.write(s)
87 else:
88 new_stream = t.cast(t.IO[bytes], TemporaryFile("wb+"))
89 new_stream.write(stream.getvalue()) # type: ignore
90 new_stream.write(s)
91 stream = new_stream
92 on_disk = True
94 total_length += length
95 return length
97 else:
98 write_binary = stream.write
100 encoder = MultipartEncoder(boundary.encode())
101 write_binary(encoder.send_event(Preamble(data=b"")))
102 for key, value in _iter_data(data):
103 reader = getattr(value, "read", None)
104 if reader is not None:
105 filename = getattr(value, "filename", getattr(value, "name", None))
106 content_type = getattr(value, "content_type", None)
107 if content_type is None:
108 content_type = (
109 filename
110 and mimetypes.guess_type(filename)[0]
111 or "application/octet-stream"
112 )
113 headers = value.headers
114 headers.update([("Content-Type", content_type)])
115 if filename is None:
116 write_binary(encoder.send_event(Field(name=key, headers=headers)))
117 else:
118 write_binary(
119 encoder.send_event(
120 File(name=key, filename=filename, headers=headers)
121 )
122 )
123 while True:
124 chunk = reader(16384)
126 if not chunk:
127 write_binary(encoder.send_event(Data(data=chunk, more_data=False)))
128 break
130 write_binary(encoder.send_event(Data(data=chunk, more_data=True)))
131 else:
132 if not isinstance(value, str):
133 value = str(value)
134 write_binary(encoder.send_event(Field(name=key, headers=Headers())))
135 write_binary(encoder.send_event(Data(data=value.encode(), more_data=False)))
137 write_binary(encoder.send_event(Epilogue(data=b"")))
139 length = stream.tell()
140 stream.seek(0)
141 return stream, length, boundary
144def encode_multipart(
145 values: t.Mapping[str, t.Any], boundary: str | None = None
146) -> tuple[str, bytes]:
147 """Like `stream_encode_multipart` but returns a tuple in the form
148 (``boundary``, ``data``) where data is bytes.
150 .. versionchanged:: 3.0
151 The ``charset`` parameter was removed.
152 """
153 stream, length, boundary = stream_encode_multipart(
154 values, use_tempfile=False, boundary=boundary
155 )
156 return boundary, stream.read()
159def _iter_data(data: t.Mapping[str, t.Any]) -> t.Iterator[tuple[str, t.Any]]:
160 """Iterate over a mapping that might have a list of values, yielding
161 all key, value pairs. Almost like iter_multi_items but only allows
162 lists, not tuples, of values so tuples can be used for files.
163 """
164 if isinstance(data, MultiDict):
165 yield from data.items(multi=True)
166 else:
167 for key, value in data.items():
168 if isinstance(value, list):
169 for v in value:
170 yield key, v
171 else:
172 yield key, value
175_TAnyMultiDict = t.TypeVar("_TAnyMultiDict", bound=MultiDict)
178class EnvironBuilder:
179 """This class can be used to conveniently create a WSGI environment
180 for testing purposes. It can be used to quickly create WSGI environments
181 or request objects from arbitrary data.
183 The signature of this class is also used in some other places as of
184 Werkzeug 0.5 (:func:`create_environ`, :meth:`Response.from_values`,
185 :meth:`Client.open`). Because of this most of the functionality is
186 available through the constructor alone.
188 Files and regular form data can be manipulated independently of each
189 other with the :attr:`form` and :attr:`files` attributes, but are
190 passed with the same argument to the constructor: `data`.
192 `data` can be any of these values:
194 - a `str` or `bytes` object: The object is converted into an
195 :attr:`input_stream`, the :attr:`content_length` is set and you have to
196 provide a :attr:`content_type`.
197 - a `dict` or :class:`MultiDict`: The keys have to be strings. The values
198 have to be either any of the following objects, or a list of any of the
199 following objects:
201 - a :class:`file`-like object: These are converted into
202 :class:`FileStorage` objects automatically.
203 - a `tuple`: The :meth:`~FileMultiDict.add_file` method is called
204 with the key and the unpacked `tuple` items as positional
205 arguments.
206 - a `str`: The string is set as form data for the associated key.
207 - a file-like object: The object content is loaded in memory and then
208 handled like a regular `str` or a `bytes`.
210 :param path: the path of the request. In the WSGI environment this will
211 end up as `PATH_INFO`. If the `query_string` is not defined
212 and there is a question mark in the `path` everything after
213 it is used as query string.
214 :param base_url: the base URL is a URL that is used to extract the WSGI
215 URL scheme, host (server name + server port) and the
216 script root (`SCRIPT_NAME`).
217 :param query_string: an optional string or dict with URL parameters.
218 :param method: the HTTP method to use, defaults to `GET`.
219 :param input_stream: an optional input stream. Do not specify this and
220 `data`. As soon as an input stream is set you can't
221 modify :attr:`args` and :attr:`files` unless you
222 set the :attr:`input_stream` to `None` again.
223 :param content_type: The content type for the request. As of 0.5 you
224 don't have to provide this when specifying files
225 and form data via `data`.
226 :param content_length: The content length for the request. You don't
227 have to specify this when providing data via
228 `data`.
229 :param errors_stream: an optional error stream that is used for
230 `wsgi.errors`. Defaults to :data:`stderr`.
231 :param multithread: controls `wsgi.multithread`. Defaults to `False`.
232 :param multiprocess: controls `wsgi.multiprocess`. Defaults to `False`.
233 :param run_once: controls `wsgi.run_once`. Defaults to `False`.
234 :param headers: an optional list or :class:`Headers` object of headers.
235 :param data: a string or dict of form data or a file-object.
236 See explanation above.
237 :param json: An object to be serialized and assigned to ``data``.
238 Defaults the content type to ``"application/json"``.
239 Serialized with the function assigned to :attr:`json_dumps`.
240 :param environ_base: an optional dict of environment defaults.
241 :param environ_overrides: an optional dict of environment overrides.
242 :param auth: An authorization object to use for the
243 ``Authorization`` header value. A ``(username, password)`` tuple
244 is a shortcut for ``Basic`` authorization.
246 .. versionchanged:: 3.0
247 The ``charset`` parameter was removed.
249 .. versionchanged:: 2.1
250 ``CONTENT_TYPE`` and ``CONTENT_LENGTH`` are not duplicated as
251 header keys in the environ.
253 .. versionchanged:: 2.0
254 ``REQUEST_URI`` and ``RAW_URI`` is the full raw URI including
255 the query string, not only the path.
257 .. versionchanged:: 2.0
258 The default :attr:`request_class` is ``Request`` instead of
259 ``BaseRequest``.
261 .. versionadded:: 2.0
262 Added the ``auth`` parameter.
264 .. versionadded:: 0.15
265 The ``json`` param and :meth:`json_dumps` method.
267 .. versionadded:: 0.15
268 The environ has keys ``REQUEST_URI`` and ``RAW_URI`` containing
269 the path before percent-decoding. This is not part of the WSGI
270 PEP, but many WSGI servers include it.
272 .. versionchanged:: 0.6
273 ``path`` and ``base_url`` can now be unicode strings that are
274 encoded with :func:`iri_to_uri`.
275 """
277 #: the server protocol to use. defaults to HTTP/1.1
278 server_protocol = "HTTP/1.1"
280 #: the wsgi version to use. defaults to (1, 0)
281 wsgi_version = (1, 0)
283 #: The default request class used by :meth:`get_request`.
284 request_class = Request
286 import json
288 #: The serialization function used when ``json`` is passed.
289 json_dumps = staticmethod(json.dumps)
290 del json
292 _args: MultiDict | None
293 _query_string: str | None
294 _input_stream: t.IO[bytes] | None
295 _form: MultiDict | None
296 _files: FileMultiDict | None
298 def __init__(
299 self,
300 path: str = "/",
301 base_url: str | None = None,
302 query_string: t.Mapping[str, str] | str | None = None,
303 method: str = "GET",
304 input_stream: t.IO[bytes] | None = None,
305 content_type: str | None = None,
306 content_length: int | None = None,
307 errors_stream: t.IO[str] | None = None,
308 multithread: bool = False,
309 multiprocess: bool = False,
310 run_once: bool = False,
311 headers: Headers | t.Iterable[tuple[str, str]] | None = None,
312 data: None | (t.IO[bytes] | str | bytes | t.Mapping[str, t.Any]) = None,
313 environ_base: t.Mapping[str, t.Any] | None = None,
314 environ_overrides: t.Mapping[str, t.Any] | None = None,
315 mimetype: str | None = None,
316 json: t.Mapping[str, t.Any] | None = None,
317 auth: Authorization | tuple[str, str] | None = None,
318 ) -> None:
319 if query_string is not None and "?" in path:
320 raise ValueError("Query string is defined in the path and as an argument")
321 request_uri = urlsplit(path)
322 if query_string is None and "?" in path:
323 query_string = request_uri.query
325 self.path = iri_to_uri(request_uri.path)
326 self.request_uri = path
327 if base_url is not None:
328 base_url = iri_to_uri(base_url)
329 self.base_url = base_url # type: ignore
330 if isinstance(query_string, str):
331 self.query_string = query_string
332 else:
333 if query_string is None:
334 query_string = MultiDict()
335 elif not isinstance(query_string, MultiDict):
336 query_string = MultiDict(query_string)
337 self.args = query_string
338 self.method = method
339 if headers is None:
340 headers = Headers()
341 elif not isinstance(headers, Headers):
342 headers = Headers(headers)
343 self.headers = headers
344 if content_type is not None:
345 self.content_type = content_type
346 if errors_stream is None:
347 errors_stream = sys.stderr
348 self.errors_stream = errors_stream
349 self.multithread = multithread
350 self.multiprocess = multiprocess
351 self.run_once = run_once
352 self.environ_base = environ_base
353 self.environ_overrides = environ_overrides
354 self.input_stream = input_stream
355 self.content_length = content_length
356 self.closed = False
358 if auth is not None:
359 if isinstance(auth, tuple):
360 auth = Authorization(
361 "basic", {"username": auth[0], "password": auth[1]}
362 )
364 self.headers.set("Authorization", auth.to_header())
366 if json is not None:
367 if data is not None:
368 raise TypeError("can't provide both json and data")
370 data = self.json_dumps(json)
372 if self.content_type is None:
373 self.content_type = "application/json"
375 if data:
376 if input_stream is not None:
377 raise TypeError("can't provide input stream and data")
378 if hasattr(data, "read"):
379 data = data.read()
380 if isinstance(data, str):
381 data = data.encode()
382 if isinstance(data, bytes):
383 self.input_stream = BytesIO(data)
384 if self.content_length is None:
385 self.content_length = len(data)
386 else:
387 for key, value in _iter_data(data):
388 if isinstance(value, (tuple, dict)) or hasattr(value, "read"):
389 self._add_file_from_data(key, value)
390 else:
391 self.form.setlistdefault(key).append(value)
393 if mimetype is not None:
394 self.mimetype = mimetype
396 @classmethod
397 def from_environ(cls, environ: WSGIEnvironment, **kwargs: t.Any) -> EnvironBuilder:
398 """Turn an environ dict back into a builder. Any extra kwargs
399 override the args extracted from the environ.
401 .. versionchanged:: 2.0
402 Path and query values are passed through the WSGI decoding
403 dance to avoid double encoding.
405 .. versionadded:: 0.15
406 """
407 headers = Headers(EnvironHeaders(environ))
408 out = {
409 "path": _wsgi_decoding_dance(environ["PATH_INFO"]),
410 "base_url": cls._make_base_url(
411 environ["wsgi.url_scheme"],
412 headers.pop("Host"),
413 _wsgi_decoding_dance(environ["SCRIPT_NAME"]),
414 ),
415 "query_string": _wsgi_decoding_dance(environ["QUERY_STRING"]),
416 "method": environ["REQUEST_METHOD"],
417 "input_stream": environ["wsgi.input"],
418 "content_type": headers.pop("Content-Type", None),
419 "content_length": headers.pop("Content-Length", None),
420 "errors_stream": environ["wsgi.errors"],
421 "multithread": environ["wsgi.multithread"],
422 "multiprocess": environ["wsgi.multiprocess"],
423 "run_once": environ["wsgi.run_once"],
424 "headers": headers,
425 }
426 out.update(kwargs)
427 return cls(**out)
429 def _add_file_from_data(
430 self,
431 key: str,
432 value: (t.IO[bytes] | tuple[t.IO[bytes], str] | tuple[t.IO[bytes], str, str]),
433 ) -> None:
434 """Called in the EnvironBuilder to add files from the data dict."""
435 if isinstance(value, tuple):
436 self.files.add_file(key, *value)
437 else:
438 self.files.add_file(key, value)
440 @staticmethod
441 def _make_base_url(scheme: str, host: str, script_root: str) -> str:
442 return urlunsplit((scheme, host, script_root, "", "")).rstrip("/") + "/"
444 @property
445 def base_url(self) -> str:
446 """The base URL is used to extract the URL scheme, host name,
447 port, and root path.
448 """
449 return self._make_base_url(self.url_scheme, self.host, self.script_root)
451 @base_url.setter
452 def base_url(self, value: str | None) -> None:
453 if value is None:
454 scheme = "http"
455 netloc = "localhost"
456 script_root = ""
457 else:
458 scheme, netloc, script_root, qs, anchor = urlsplit(value)
459 if qs or anchor:
460 raise ValueError("base url must not contain a query string or fragment")
461 self.script_root = script_root.rstrip("/")
462 self.host = netloc
463 self.url_scheme = scheme
465 @property
466 def content_type(self) -> str | None:
467 """The content type for the request. Reflected from and to
468 the :attr:`headers`. Do not set if you set :attr:`files` or
469 :attr:`form` for auto detection.
470 """
471 ct = self.headers.get("Content-Type")
472 if ct is None and not self._input_stream:
473 if self._files:
474 return "multipart/form-data"
475 if self._form:
476 return "application/x-www-form-urlencoded"
477 return None
478 return ct
480 @content_type.setter
481 def content_type(self, value: str | None) -> None:
482 if value is None:
483 self.headers.pop("Content-Type", None)
484 else:
485 self.headers["Content-Type"] = value
487 @property
488 def mimetype(self) -> str | None:
489 """The mimetype (content type without charset etc.)
491 .. versionadded:: 0.14
492 """
493 ct = self.content_type
494 return ct.split(";")[0].strip() if ct else None
496 @mimetype.setter
497 def mimetype(self, value: str) -> None:
498 self.content_type = get_content_type(value, "utf-8")
500 @property
501 def mimetype_params(self) -> t.Mapping[str, str]:
502 """The mimetype parameters as dict. For example if the
503 content type is ``text/html; charset=utf-8`` the params would be
504 ``{'charset': 'utf-8'}``.
506 .. versionadded:: 0.14
507 """
509 def on_update(d: CallbackDict) -> None:
510 self.headers["Content-Type"] = dump_options_header(self.mimetype, d)
512 d = parse_options_header(self.headers.get("content-type", ""))[1]
513 return CallbackDict(d, on_update)
515 @property
516 def content_length(self) -> int | None:
517 """The content length as integer. Reflected from and to the
518 :attr:`headers`. Do not set if you set :attr:`files` or
519 :attr:`form` for auto detection.
520 """
521 return self.headers.get("Content-Length", type=int)
523 @content_length.setter
524 def content_length(self, value: int | None) -> None:
525 if value is None:
526 self.headers.pop("Content-Length", None)
527 else:
528 self.headers["Content-Length"] = str(value)
530 def _get_form(self, name: str, storage: type[_TAnyMultiDict]) -> _TAnyMultiDict:
531 """Common behavior for getting the :attr:`form` and
532 :attr:`files` properties.
534 :param name: Name of the internal cached attribute.
535 :param storage: Storage class used for the data.
536 """
537 if self.input_stream is not None:
538 raise AttributeError("an input stream is defined")
540 rv = getattr(self, name)
542 if rv is None:
543 rv = storage()
544 setattr(self, name, rv)
546 return rv # type: ignore
548 def _set_form(self, name: str, value: MultiDict) -> None:
549 """Common behavior for setting the :attr:`form` and
550 :attr:`files` properties.
552 :param name: Name of the internal cached attribute.
553 :param value: Value to assign to the attribute.
554 """
555 self._input_stream = None
556 setattr(self, name, value)
558 @property
559 def form(self) -> MultiDict:
560 """A :class:`MultiDict` of form values."""
561 return self._get_form("_form", MultiDict)
563 @form.setter
564 def form(self, value: MultiDict) -> None:
565 self._set_form("_form", value)
567 @property
568 def files(self) -> FileMultiDict:
569 """A :class:`FileMultiDict` of uploaded files. Use
570 :meth:`~FileMultiDict.add_file` to add new files.
571 """
572 return self._get_form("_files", FileMultiDict)
574 @files.setter
575 def files(self, value: FileMultiDict) -> None:
576 self._set_form("_files", value)
578 @property
579 def input_stream(self) -> t.IO[bytes] | None:
580 """An optional input stream. This is mutually exclusive with
581 setting :attr:`form` and :attr:`files`, setting it will clear
582 those. Do not provide this if the method is not ``POST`` or
583 another method that has a body.
584 """
585 return self._input_stream
587 @input_stream.setter
588 def input_stream(self, value: t.IO[bytes] | None) -> None:
589 self._input_stream = value
590 self._form = None
591 self._files = None
593 @property
594 def query_string(self) -> str:
595 """The query string. If you set this to a string
596 :attr:`args` will no longer be available.
597 """
598 if self._query_string is None:
599 if self._args is not None:
600 return _urlencode(self._args)
601 return ""
602 return self._query_string
604 @query_string.setter
605 def query_string(self, value: str | None) -> None:
606 self._query_string = value
607 self._args = None
609 @property
610 def args(self) -> MultiDict:
611 """The URL arguments as :class:`MultiDict`."""
612 if self._query_string is not None:
613 raise AttributeError("a query string is defined")
614 if self._args is None:
615 self._args = MultiDict()
616 return self._args
618 @args.setter
619 def args(self, value: MultiDict | None) -> None:
620 self._query_string = None
621 self._args = value
623 @property
624 def server_name(self) -> str:
625 """The server name (read-only, use :attr:`host` to set)"""
626 return self.host.split(":", 1)[0]
628 @property
629 def server_port(self) -> int:
630 """The server port as integer (read-only, use :attr:`host` to set)"""
631 pieces = self.host.split(":", 1)
633 if len(pieces) == 2:
634 try:
635 return int(pieces[1])
636 except ValueError:
637 pass
639 if self.url_scheme == "https":
640 return 443
641 return 80
643 def __del__(self) -> None:
644 try:
645 self.close()
646 except Exception:
647 pass
649 def close(self) -> None:
650 """Closes all files. If you put real :class:`file` objects into the
651 :attr:`files` dict you can call this method to automatically close
652 them all in one go.
653 """
654 if self.closed:
655 return
656 try:
657 files = self.files.values()
658 except AttributeError:
659 files = () # type: ignore
660 for f in files:
661 try:
662 f.close()
663 except Exception:
664 pass
665 self.closed = True
667 def get_environ(self) -> WSGIEnvironment:
668 """Return the built environ.
670 .. versionchanged:: 0.15
671 The content type and length headers are set based on
672 input stream detection. Previously this only set the WSGI
673 keys.
674 """
675 input_stream = self.input_stream
676 content_length = self.content_length
678 mimetype = self.mimetype
679 content_type = self.content_type
681 if input_stream is not None:
682 start_pos = input_stream.tell()
683 input_stream.seek(0, 2)
684 end_pos = input_stream.tell()
685 input_stream.seek(start_pos)
686 content_length = end_pos - start_pos
687 elif mimetype == "multipart/form-data":
688 input_stream, content_length, boundary = stream_encode_multipart(
689 CombinedMultiDict([self.form, self.files])
690 )
691 content_type = f'{mimetype}; boundary="{boundary}"'
692 elif mimetype == "application/x-www-form-urlencoded":
693 form_encoded = _urlencode(self.form).encode("ascii")
694 content_length = len(form_encoded)
695 input_stream = BytesIO(form_encoded)
696 else:
697 input_stream = BytesIO()
699 result: WSGIEnvironment = {}
700 if self.environ_base:
701 result.update(self.environ_base)
703 def _path_encode(x: str) -> str:
704 return _wsgi_encoding_dance(unquote(x))
706 raw_uri = _wsgi_encoding_dance(self.request_uri)
707 result.update(
708 {
709 "REQUEST_METHOD": self.method,
710 "SCRIPT_NAME": _path_encode(self.script_root),
711 "PATH_INFO": _path_encode(self.path),
712 "QUERY_STRING": _wsgi_encoding_dance(self.query_string),
713 # Non-standard, added by mod_wsgi, uWSGI
714 "REQUEST_URI": raw_uri,
715 # Non-standard, added by gunicorn
716 "RAW_URI": raw_uri,
717 "SERVER_NAME": self.server_name,
718 "SERVER_PORT": str(self.server_port),
719 "HTTP_HOST": self.host,
720 "SERVER_PROTOCOL": self.server_protocol,
721 "wsgi.version": self.wsgi_version,
722 "wsgi.url_scheme": self.url_scheme,
723 "wsgi.input": input_stream,
724 "wsgi.errors": self.errors_stream,
725 "wsgi.multithread": self.multithread,
726 "wsgi.multiprocess": self.multiprocess,
727 "wsgi.run_once": self.run_once,
728 }
729 )
731 headers = self.headers.copy()
732 # Don't send these as headers, they're part of the environ.
733 headers.remove("Content-Type")
734 headers.remove("Content-Length")
736 if content_type is not None:
737 result["CONTENT_TYPE"] = content_type
739 if content_length is not None:
740 result["CONTENT_LENGTH"] = str(content_length)
742 combined_headers = defaultdict(list)
744 for key, value in headers.to_wsgi_list():
745 combined_headers[f"HTTP_{key.upper().replace('-', '_')}"].append(value)
747 for key, values in combined_headers.items():
748 result[key] = ", ".join(values)
750 if self.environ_overrides:
751 result.update(self.environ_overrides)
753 return result
755 def get_request(self, cls: type[Request] | None = None) -> Request:
756 """Returns a request with the data. If the request class is not
757 specified :attr:`request_class` is used.
759 :param cls: The request wrapper to use.
760 """
761 if cls is None:
762 cls = self.request_class
764 return cls(self.get_environ())
767class ClientRedirectError(Exception):
768 """If a redirect loop is detected when using follow_redirects=True with
769 the :cls:`Client`, then this exception is raised.
770 """
773class Client:
774 """Simulate sending requests to a WSGI application without running a WSGI or HTTP
775 server.
777 :param application: The WSGI application to make requests to.
778 :param response_wrapper: A :class:`.Response` class to wrap response data with.
779 Defaults to :class:`.TestResponse`. If it's not a subclass of ``TestResponse``,
780 one will be created.
781 :param use_cookies: Persist cookies from ``Set-Cookie`` response headers to the
782 ``Cookie`` header in subsequent requests. Domain and path matching is supported,
783 but other cookie parameters are ignored.
784 :param allow_subdomain_redirects: Allow requests to follow redirects to subdomains.
785 Enable this if the application handles subdomains and redirects between them.
787 .. versionchanged:: 2.3
788 Simplify cookie implementation, support domain and path matching.
790 .. versionchanged:: 2.1
791 All data is available as properties on the returned response object. The
792 response cannot be returned as a tuple.
794 .. versionchanged:: 2.0
795 ``response_wrapper`` is always a subclass of :class:``TestResponse``.
797 .. versionchanged:: 0.5
798 Added the ``use_cookies`` parameter.
799 """
801 def __init__(
802 self,
803 application: WSGIApplication,
804 response_wrapper: type[Response] | None = None,
805 use_cookies: bool = True,
806 allow_subdomain_redirects: bool = False,
807 ) -> None:
808 self.application = application
810 if response_wrapper in {None, Response}:
811 response_wrapper = TestResponse
812 elif not isinstance(response_wrapper, TestResponse):
813 response_wrapper = type(
814 "WrapperTestResponse",
815 (TestResponse, response_wrapper), # type: ignore
816 {},
817 )
819 self.response_wrapper = t.cast(t.Type["TestResponse"], response_wrapper)
821 if use_cookies:
822 self._cookies: dict[tuple[str, str, str], Cookie] | None = {}
823 else:
824 self._cookies = None
826 self.allow_subdomain_redirects = allow_subdomain_redirects
828 def get_cookie(
829 self, key: str, domain: str = "localhost", path: str = "/"
830 ) -> Cookie | None:
831 """Return a :class:`.Cookie` if it exists. Cookies are uniquely identified by
832 ``(domain, path, key)``.
834 :param key: The decoded form of the key for the cookie.
835 :param domain: The domain the cookie was set for.
836 :param path: The path the cookie was set for.
838 .. versionadded:: 2.3
839 """
840 if self._cookies is None:
841 raise TypeError(
842 "Cookies are disabled. Create a client with 'use_cookies=True'."
843 )
845 return self._cookies.get((domain, path, key))
847 def set_cookie(
848 self,
849 key: str,
850 value: str = "",
851 *,
852 domain: str = "localhost",
853 origin_only: bool = True,
854 path: str = "/",
855 **kwargs: t.Any,
856 ) -> None:
857 """Set a cookie to be sent in subsequent requests.
859 This is a convenience to skip making a test request to a route that would set
860 the cookie. To test the cookie, make a test request to a route that uses the
861 cookie value.
863 The client uses ``domain``, ``origin_only``, and ``path`` to determine which
864 cookies to send with a request. It does not use other cookie parameters that
865 browsers use, since they're not applicable in tests.
867 :param key: The key part of the cookie.
868 :param value: The value part of the cookie.
869 :param domain: Send this cookie with requests that match this domain. If
870 ``origin_only`` is true, it must be an exact match, otherwise it may be a
871 suffix match.
872 :param origin_only: Whether the domain must be an exact match to the request.
873 :param path: Send this cookie with requests that match this path either exactly
874 or as a prefix.
875 :param kwargs: Passed to :func:`.dump_cookie`.
877 .. versionchanged:: 3.0
878 The parameter ``server_name`` is removed. The first parameter is
879 ``key``. Use the ``domain`` and ``origin_only`` parameters instead.
881 .. versionchanged:: 2.3
882 The ``origin_only`` parameter was added.
884 .. versionchanged:: 2.3
885 The ``domain`` parameter defaults to ``localhost``.
886 """
887 if self._cookies is None:
888 raise TypeError(
889 "Cookies are disabled. Create a client with 'use_cookies=True'."
890 )
892 cookie = Cookie._from_response_header(
893 domain, "/", dump_cookie(key, value, domain=domain, path=path, **kwargs)
894 )
895 cookie.origin_only = origin_only
897 if cookie._should_delete:
898 self._cookies.pop(cookie._storage_key, None)
899 else:
900 self._cookies[cookie._storage_key] = cookie
902 def delete_cookie(
903 self,
904 key: str,
905 *,
906 domain: str = "localhost",
907 path: str = "/",
908 ) -> None:
909 """Delete a cookie if it exists. Cookies are uniquely identified by
910 ``(domain, path, key)``.
912 :param key: The decoded form of the key for the cookie.
913 :param domain: The domain the cookie was set for.
914 :param path: The path the cookie was set for.
916 .. versionchanged:: 3.0
917 The ``server_name`` parameter is removed. The first parameter is
918 ``key``. Use the ``domain`` parameter instead.
920 .. versionchanged:: 3.0
921 The ``secure``, ``httponly`` and ``samesite`` parameters are removed.
923 .. versionchanged:: 2.3
924 The ``domain`` parameter defaults to ``localhost``.
925 """
926 if self._cookies is None:
927 raise TypeError(
928 "Cookies are disabled. Create a client with 'use_cookies=True'."
929 )
931 self._cookies.pop((domain, path, key), None)
933 def _add_cookies_to_wsgi(self, environ: WSGIEnvironment) -> None:
934 """If cookies are enabled, set the ``Cookie`` header in the environ to the
935 cookies that are applicable to the request host and path.
937 :meta private:
939 .. versionadded:: 2.3
940 """
941 if self._cookies is None:
942 return
944 url = urlsplit(get_current_url(environ))
945 server_name = url.hostname or "localhost"
946 value = "; ".join(
947 c._to_request_header()
948 for c in self._cookies.values()
949 if c._matches_request(server_name, url.path)
950 )
952 if value:
953 environ["HTTP_COOKIE"] = value
954 else:
955 environ.pop("HTTP_COOKIE", None)
957 def _update_cookies_from_response(
958 self, server_name: str, path: str, headers: list[str]
959 ) -> None:
960 """If cookies are enabled, update the stored cookies from any ``Set-Cookie``
961 headers in the response.
963 :meta private:
965 .. versionadded:: 2.3
966 """
967 if self._cookies is None:
968 return
970 for header in headers:
971 cookie = Cookie._from_response_header(server_name, path, header)
973 if cookie._should_delete:
974 self._cookies.pop(cookie._storage_key, None)
975 else:
976 self._cookies[cookie._storage_key] = cookie
978 def run_wsgi_app(
979 self, environ: WSGIEnvironment, buffered: bool = False
980 ) -> tuple[t.Iterable[bytes], str, Headers]:
981 """Runs the wrapped WSGI app with the given environment.
983 :meta private:
984 """
985 self._add_cookies_to_wsgi(environ)
986 rv = run_wsgi_app(self.application, environ, buffered=buffered)
987 url = urlsplit(get_current_url(environ))
988 self._update_cookies_from_response(
989 url.hostname or "localhost", url.path, rv[2].getlist("Set-Cookie")
990 )
991 return rv
993 def resolve_redirect(
994 self, response: TestResponse, buffered: bool = False
995 ) -> TestResponse:
996 """Perform a new request to the location given by the redirect
997 response to the previous request.
999 :meta private:
1000 """
1001 scheme, netloc, path, qs, anchor = urlsplit(response.location)
1002 builder = EnvironBuilder.from_environ(
1003 response.request.environ, path=path, query_string=qs
1004 )
1006 to_name_parts = netloc.split(":", 1)[0].split(".")
1007 from_name_parts = builder.server_name.split(".")
1009 if to_name_parts != [""]:
1010 # The new location has a host, use it for the base URL.
1011 builder.url_scheme = scheme
1012 builder.host = netloc
1013 else:
1014 # A local redirect with autocorrect_location_header=False
1015 # doesn't have a host, so use the request's host.
1016 to_name_parts = from_name_parts
1018 # Explain why a redirect to a different server name won't be followed.
1019 if to_name_parts != from_name_parts:
1020 if to_name_parts[-len(from_name_parts) :] == from_name_parts:
1021 if not self.allow_subdomain_redirects:
1022 raise RuntimeError("Following subdomain redirects is not enabled.")
1023 else:
1024 raise RuntimeError("Following external redirects is not supported.")
1026 path_parts = path.split("/")
1027 root_parts = builder.script_root.split("/")
1029 if path_parts[: len(root_parts)] == root_parts:
1030 # Strip the script root from the path.
1031 builder.path = path[len(builder.script_root) :]
1032 else:
1033 # The new location is not under the script root, so use the
1034 # whole path and clear the previous root.
1035 builder.path = path
1036 builder.script_root = ""
1038 # Only 307 and 308 preserve all of the original request.
1039 if response.status_code not in {307, 308}:
1040 # HEAD is preserved, everything else becomes GET.
1041 if builder.method != "HEAD":
1042 builder.method = "GET"
1044 # Clear the body and the headers that describe it.
1046 if builder.input_stream is not None:
1047 builder.input_stream.close()
1048 builder.input_stream = None
1050 builder.content_type = None
1051 builder.content_length = None
1052 builder.headers.pop("Transfer-Encoding", None)
1054 return self.open(builder, buffered=buffered)
1056 def open(
1057 self,
1058 *args: t.Any,
1059 buffered: bool = False,
1060 follow_redirects: bool = False,
1061 **kwargs: t.Any,
1062 ) -> TestResponse:
1063 """Generate an environ dict from the given arguments, make a
1064 request to the application using it, and return the response.
1066 :param args: Passed to :class:`EnvironBuilder` to create the
1067 environ for the request. If a single arg is passed, it can
1068 be an existing :class:`EnvironBuilder` or an environ dict.
1069 :param buffered: Convert the iterator returned by the app into
1070 a list. If the iterator has a ``close()`` method, it is
1071 called automatically.
1072 :param follow_redirects: Make additional requests to follow HTTP
1073 redirects until a non-redirect status is returned.
1074 :attr:`TestResponse.history` lists the intermediate
1075 responses.
1077 .. versionchanged:: 2.1
1078 Removed the ``as_tuple`` parameter.
1080 .. versionchanged:: 2.0
1081 The request input stream is closed when calling
1082 ``response.close()``. Input streams for redirects are
1083 automatically closed.
1085 .. versionchanged:: 0.5
1086 If a dict is provided as file in the dict for the ``data``
1087 parameter the content type has to be called ``content_type``
1088 instead of ``mimetype``. This change was made for
1089 consistency with :class:`werkzeug.FileWrapper`.
1091 .. versionchanged:: 0.5
1092 Added the ``follow_redirects`` parameter.
1093 """
1094 request: Request | None = None
1096 if not kwargs and len(args) == 1:
1097 arg = args[0]
1099 if isinstance(arg, EnvironBuilder):
1100 request = arg.get_request()
1101 elif isinstance(arg, dict):
1102 request = EnvironBuilder.from_environ(arg).get_request()
1103 elif isinstance(arg, Request):
1104 request = arg
1106 if request is None:
1107 builder = EnvironBuilder(*args, **kwargs)
1109 try:
1110 request = builder.get_request()
1111 finally:
1112 builder.close()
1114 response = self.run_wsgi_app(request.environ, buffered=buffered)
1115 response = self.response_wrapper(*response, request=request)
1117 redirects = set()
1118 history: list[TestResponse] = []
1120 if not follow_redirects:
1121 return response
1123 while response.status_code in {
1124 301,
1125 302,
1126 303,
1127 305,
1128 307,
1129 308,
1130 }:
1131 # Exhaust intermediate response bodies to ensure middleware
1132 # that returns an iterator runs any cleanup code.
1133 if not buffered:
1134 response.make_sequence()
1135 response.close()
1137 new_redirect_entry = (response.location, response.status_code)
1139 if new_redirect_entry in redirects:
1140 raise ClientRedirectError(
1141 f"Loop detected: A {response.status_code} redirect"
1142 f" to {response.location} was already made."
1143 )
1145 redirects.add(new_redirect_entry)
1146 response.history = tuple(history)
1147 history.append(response)
1148 response = self.resolve_redirect(response, buffered=buffered)
1149 else:
1150 # This is the final request after redirects.
1151 response.history = tuple(history)
1152 # Close the input stream when closing the response, in case
1153 # the input is an open temporary file.
1154 response.call_on_close(request.input_stream.close)
1155 return response
1157 def get(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1158 """Call :meth:`open` with ``method`` set to ``GET``."""
1159 kw["method"] = "GET"
1160 return self.open(*args, **kw)
1162 def post(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1163 """Call :meth:`open` with ``method`` set to ``POST``."""
1164 kw["method"] = "POST"
1165 return self.open(*args, **kw)
1167 def put(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1168 """Call :meth:`open` with ``method`` set to ``PUT``."""
1169 kw["method"] = "PUT"
1170 return self.open(*args, **kw)
1172 def delete(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1173 """Call :meth:`open` with ``method`` set to ``DELETE``."""
1174 kw["method"] = "DELETE"
1175 return self.open(*args, **kw)
1177 def patch(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1178 """Call :meth:`open` with ``method`` set to ``PATCH``."""
1179 kw["method"] = "PATCH"
1180 return self.open(*args, **kw)
1182 def options(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1183 """Call :meth:`open` with ``method`` set to ``OPTIONS``."""
1184 kw["method"] = "OPTIONS"
1185 return self.open(*args, **kw)
1187 def head(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1188 """Call :meth:`open` with ``method`` set to ``HEAD``."""
1189 kw["method"] = "HEAD"
1190 return self.open(*args, **kw)
1192 def trace(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1193 """Call :meth:`open` with ``method`` set to ``TRACE``."""
1194 kw["method"] = "TRACE"
1195 return self.open(*args, **kw)
1197 def __repr__(self) -> str:
1198 return f"<{type(self).__name__} {self.application!r}>"
1201def create_environ(*args: t.Any, **kwargs: t.Any) -> WSGIEnvironment:
1202 """Create a new WSGI environ dict based on the values passed. The first
1203 parameter should be the path of the request which defaults to '/'. The
1204 second one can either be an absolute path (in that case the host is
1205 localhost:80) or a full path to the request with scheme, netloc port and
1206 the path to the script.
1208 This accepts the same arguments as the :class:`EnvironBuilder`
1209 constructor.
1211 .. versionchanged:: 0.5
1212 This function is now a thin wrapper over :class:`EnvironBuilder` which
1213 was added in 0.5. The `headers`, `environ_base`, `environ_overrides`
1214 and `charset` parameters were added.
1215 """
1216 builder = EnvironBuilder(*args, **kwargs)
1218 try:
1219 return builder.get_environ()
1220 finally:
1221 builder.close()
1224def run_wsgi_app(
1225 app: WSGIApplication, environ: WSGIEnvironment, buffered: bool = False
1226) -> tuple[t.Iterable[bytes], str, Headers]:
1227 """Return a tuple in the form (app_iter, status, headers) of the
1228 application output. This works best if you pass it an application that
1229 returns an iterator all the time.
1231 Sometimes applications may use the `write()` callable returned
1232 by the `start_response` function. This tries to resolve such edge
1233 cases automatically. But if you don't get the expected output you
1234 should set `buffered` to `True` which enforces buffering.
1236 If passed an invalid WSGI application the behavior of this function is
1237 undefined. Never pass non-conforming WSGI applications to this function.
1239 :param app: the application to execute.
1240 :param buffered: set to `True` to enforce buffering.
1241 :return: tuple in the form ``(app_iter, status, headers)``
1242 """
1243 # Copy environ to ensure any mutations by the app (ProxyFix, for
1244 # example) don't affect subsequent requests (such as redirects).
1245 environ = _get_environ(environ).copy()
1246 status: str
1247 response: tuple[str, list[tuple[str, str]]] | None = None
1248 buffer: list[bytes] = []
1250 def start_response(status, headers, exc_info=None): # type: ignore
1251 nonlocal response
1253 if exc_info:
1254 try:
1255 raise exc_info[1].with_traceback(exc_info[2])
1256 finally:
1257 exc_info = None
1259 response = (status, headers)
1260 return buffer.append
1262 app_rv = app(environ, start_response)
1263 close_func = getattr(app_rv, "close", None)
1264 app_iter: t.Iterable[bytes] = iter(app_rv)
1266 # when buffering we emit the close call early and convert the
1267 # application iterator into a regular list
1268 if buffered:
1269 try:
1270 app_iter = list(app_iter)
1271 finally:
1272 if close_func is not None:
1273 close_func()
1275 # otherwise we iterate the application iter until we have a response, chain
1276 # the already received data with the already collected data and wrap it in
1277 # a new `ClosingIterator` if we need to restore a `close` callable from the
1278 # original return value.
1279 else:
1280 for item in app_iter:
1281 buffer.append(item)
1283 if response is not None:
1284 break
1286 if buffer:
1287 app_iter = chain(buffer, app_iter)
1289 if close_func is not None and app_iter is not app_rv:
1290 app_iter = ClosingIterator(app_iter, close_func)
1292 status, headers = response # type: ignore
1293 return app_iter, status, Headers(headers)
1296class TestResponse(Response):
1297 """:class:`~werkzeug.wrappers.Response` subclass that provides extra
1298 information about requests made with the test :class:`Client`.
1300 Test client requests will always return an instance of this class.
1301 If a custom response class is passed to the client, it is
1302 subclassed along with this to support test information.
1304 If the test request included large files, or if the application is
1305 serving a file, call :meth:`close` to close any open files and
1306 prevent Python showing a ``ResourceWarning``.
1308 .. versionchanged:: 2.2
1309 Set the ``default_mimetype`` to None to prevent a mimetype being
1310 assumed if missing.
1312 .. versionchanged:: 2.1
1313 Response instances cannot be treated as tuples.
1315 .. versionadded:: 2.0
1316 Test client methods always return instances of this class.
1317 """
1319 default_mimetype = None
1320 # Don't assume a mimetype, instead use whatever the response provides
1322 request: Request
1323 """A request object with the environ used to make the request that
1324 resulted in this response.
1325 """
1327 history: tuple[TestResponse, ...]
1328 """A list of intermediate responses. Populated when the test request
1329 is made with ``follow_redirects`` enabled.
1330 """
1332 # Tell Pytest to ignore this, it's not a test class.
1333 __test__ = False
1335 def __init__(
1336 self,
1337 response: t.Iterable[bytes],
1338 status: str,
1339 headers: Headers,
1340 request: Request,
1341 history: tuple[TestResponse] = (), # type: ignore
1342 **kwargs: t.Any,
1343 ) -> None:
1344 super().__init__(response, status, headers, **kwargs)
1345 self.request = request
1346 self.history = history
1347 self._compat_tuple = response, status, headers
1349 @cached_property
1350 def text(self) -> str:
1351 """The response data as text. A shortcut for
1352 ``response.get_data(as_text=True)``.
1354 .. versionadded:: 2.1
1355 """
1356 return self.get_data(as_text=True)
1359@dataclasses.dataclass
1360class Cookie:
1361 """A cookie key, value, and parameters.
1363 The class itself is not a public API. Its attributes are documented for inspection
1364 with :meth:`.Client.get_cookie` only.
1366 .. versionadded:: 2.3
1367 """
1369 key: str
1370 """The cookie key, encoded as a client would see it."""
1372 value: str
1373 """The cookie key, encoded as a client would see it."""
1375 decoded_key: str
1376 """The cookie key, decoded as the application would set and see it."""
1378 decoded_value: str
1379 """The cookie value, decoded as the application would set and see it."""
1381 expires: datetime | None
1382 """The time at which the cookie is no longer valid."""
1384 max_age: int | None
1385 """The number of seconds from when the cookie was set at which it is
1386 no longer valid.
1387 """
1389 domain: str
1390 """The domain that the cookie was set for, or the request domain if not set."""
1392 origin_only: bool
1393 """Whether the cookie will be sent for exact domain matches only. This is ``True``
1394 if the ``Domain`` parameter was not present.
1395 """
1397 path: str
1398 """The path that the cookie was set for."""
1400 secure: bool | None
1401 """The ``Secure`` parameter."""
1403 http_only: bool | None
1404 """The ``HttpOnly`` parameter."""
1406 same_site: str | None
1407 """The ``SameSite`` parameter."""
1409 def _matches_request(self, server_name: str, path: str) -> bool:
1410 return (
1411 server_name == self.domain
1412 or (
1413 not self.origin_only
1414 and server_name.endswith(self.domain)
1415 and server_name[: -len(self.domain)].endswith(".")
1416 )
1417 ) and (
1418 path == self.path
1419 or (
1420 path.startswith(self.path)
1421 and path[len(self.path) - self.path.endswith("/") :].startswith("/")
1422 )
1423 )
1425 def _to_request_header(self) -> str:
1426 return f"{self.key}={self.value}"
1428 @classmethod
1429 def _from_response_header(cls, server_name: str, path: str, header: str) -> te.Self:
1430 header, _, parameters_str = header.partition(";")
1431 key, _, value = header.partition("=")
1432 decoded_key, decoded_value = next(parse_cookie(header).items())
1433 params = {}
1435 for item in parameters_str.split(";"):
1436 k, sep, v = item.partition("=")
1437 params[k.strip().lower()] = v.strip() if sep else None
1439 return cls(
1440 key=key.strip(),
1441 value=value.strip(),
1442 decoded_key=decoded_key,
1443 decoded_value=decoded_value,
1444 expires=parse_date(params.get("expires")),
1445 max_age=int(params["max-age"] or 0) if "max-age" in params else None,
1446 domain=params.get("domain") or server_name,
1447 origin_only="domain" not in params,
1448 path=params.get("path") or path.rpartition("/")[0] or "/",
1449 secure="secure" in params,
1450 http_only="httponly" in params,
1451 same_site=params.get("samesite"),
1452 )
1454 @property
1455 def _storage_key(self) -> tuple[str, str, str]:
1456 return self.domain, self.path, self.decoded_key
1458 @property
1459 def _should_delete(self) -> bool:
1460 return self.max_age == 0 or (
1461 self.expires is not None and self.expires.timestamp() == 0
1462 )