Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/test.py: 57%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import dataclasses
4import mimetypes
5import sys
6import typing as t
7from collections import defaultdict
8from datetime import datetime
9from io import BytesIO
10from itertools import chain
11from random import random
12from tempfile import TemporaryFile
13from time import time
14from urllib.parse import unquote
15from urllib.parse import urlsplit
16from urllib.parse import urlunsplit
18from ._internal import _get_environ
19from ._internal import _wsgi_decoding_dance
20from ._internal import _wsgi_encoding_dance
21from .datastructures import Authorization
22from .datastructures import CallbackDict
23from .datastructures import CombinedMultiDict
24from .datastructures import EnvironHeaders
25from .datastructures import FileMultiDict
26from .datastructures import Headers
27from .datastructures import MultiDict
28from .http import dump_cookie
29from .http import dump_options_header
30from .http import parse_cookie
31from .http import parse_date
32from .http import parse_options_header
33from .sansio.multipart import Data
34from .sansio.multipart import Epilogue
35from .sansio.multipart import Field
36from .sansio.multipart import File
37from .sansio.multipart import MultipartEncoder
38from .sansio.multipart import Preamble
39from .urls import _urlencode
40from .urls import iri_to_uri
41from .utils import cached_property
42from .utils import get_content_type
43from .wrappers.request import Request
44from .wrappers.response import Response
45from .wsgi import ClosingIterator
46from .wsgi import get_current_url
48if t.TYPE_CHECKING:
49 import typing_extensions as te
50 from _typeshed.wsgi import WSGIApplication
51 from _typeshed.wsgi import WSGIEnvironment
54def stream_encode_multipart(
55 data: t.Mapping[str, t.Any],
56 use_tempfile: bool = True,
57 threshold: int = 1024 * 500,
58 boundary: str | None = None,
59) -> tuple[t.IO[bytes], int, str]:
60 """Encode a dict of values (either strings or file descriptors or
61 :class:`FileStorage` objects.) into a multipart encoded string stored
62 in a file descriptor.
64 .. versionchanged:: 3.0
65 The ``charset`` parameter was removed.
66 """
67 if boundary is None:
68 boundary = f"---------------WerkzeugFormPart_{time()}{random()}"
70 stream: t.IO[bytes] = BytesIO()
71 total_length = 0
72 on_disk = False
73 write_binary: t.Callable[[bytes], int]
75 if use_tempfile:
77 def write_binary(s: bytes) -> int:
78 nonlocal stream, total_length, on_disk
80 if on_disk:
81 return stream.write(s)
82 else:
83 length = len(s)
85 if length + total_length <= threshold:
86 stream.write(s)
87 else:
88 new_stream = t.cast(t.IO[bytes], TemporaryFile("wb+"))
89 new_stream.write(stream.getvalue()) # type: ignore
90 new_stream.write(s)
91 stream = new_stream
92 on_disk = True
94 total_length += length
95 return length
97 else:
98 write_binary = stream.write
100 encoder = MultipartEncoder(boundary.encode())
101 write_binary(encoder.send_event(Preamble(data=b"")))
102 for key, value in _iter_data(data):
103 reader = getattr(value, "read", None)
104 if reader is not None:
105 filename = getattr(value, "filename", getattr(value, "name", None))
106 content_type = getattr(value, "content_type", None)
107 if content_type is None:
108 content_type = (
109 filename
110 and mimetypes.guess_type(filename)[0]
111 or "application/octet-stream"
112 )
113 headers = value.headers
114 headers.update([("Content-Type", content_type)])
115 if filename is None:
116 write_binary(encoder.send_event(Field(name=key, headers=headers)))
117 else:
118 write_binary(
119 encoder.send_event(
120 File(name=key, filename=filename, headers=headers)
121 )
122 )
123 while True:
124 chunk = reader(16384)
126 if not chunk:
127 write_binary(encoder.send_event(Data(data=chunk, more_data=False)))
128 break
130 write_binary(encoder.send_event(Data(data=chunk, more_data=True)))
131 else:
132 if not isinstance(value, str):
133 value = str(value)
134 write_binary(encoder.send_event(Field(name=key, headers=Headers())))
135 write_binary(encoder.send_event(Data(data=value.encode(), more_data=False)))
137 write_binary(encoder.send_event(Epilogue(data=b"")))
139 length = stream.tell()
140 stream.seek(0)
141 return stream, length, boundary
144def encode_multipart(
145 values: t.Mapping[str, t.Any], boundary: str | None = None
146) -> tuple[str, bytes]:
147 """Like `stream_encode_multipart` but returns a tuple in the form
148 (``boundary``, ``data``) where data is bytes.
150 .. versionchanged:: 3.0
151 The ``charset`` parameter was removed.
152 """
153 stream, length, boundary = stream_encode_multipart(
154 values, use_tempfile=False, boundary=boundary
155 )
156 return boundary, stream.read()
159def _iter_data(data: t.Mapping[str, t.Any]) -> t.Iterator[tuple[str, t.Any]]:
160 """Iterate over a mapping that might have a list of values, yielding
161 all key, value pairs. Almost like iter_multi_items but only allows
162 lists, not tuples, of values so tuples can be used for files.
163 """
164 if isinstance(data, MultiDict):
165 yield from data.items(multi=True)
166 else:
167 for key, value in data.items():
168 if isinstance(value, list):
169 for v in value:
170 yield key, v
171 else:
172 yield key, value
175_TAnyMultiDict = t.TypeVar("_TAnyMultiDict", bound="MultiDict[t.Any, t.Any]")
178class EnvironBuilder:
179 """This class can be used to conveniently create a WSGI environment
180 for testing purposes. It can be used to quickly create WSGI environments
181 or request objects from arbitrary data.
183 The signature of this class is also used in some other places as of
184 Werkzeug 0.5 (:func:`create_environ`, :meth:`Response.from_values`,
185 :meth:`Client.open`). Because of this most of the functionality is
186 available through the constructor alone.
188 Files and regular form data can be manipulated independently of each
189 other with the :attr:`form` and :attr:`files` attributes, but are
190 passed with the same argument to the constructor: `data`.
192 `data` can be any of these values:
194 - a `str` or `bytes` object: The object is converted into an
195 :attr:`input_stream`, the :attr:`content_length` is set and you have to
196 provide a :attr:`content_type`.
197 - a `dict` or :class:`MultiDict`: The keys have to be strings. The values
198 have to be either any of the following objects, or a list of any of the
199 following objects:
201 - a :class:`file`-like object: These are converted into
202 :class:`FileStorage` objects automatically.
203 - a `tuple`: The :meth:`~FileMultiDict.add_file` method is called
204 with the key and the unpacked `tuple` items as positional
205 arguments.
206 - a `str`: The string is set as form data for the associated key.
207 - a file-like object: The object content is loaded in memory and then
208 handled like a regular `str` or a `bytes`.
210 :param path: the path of the request. In the WSGI environment this will
211 end up as `PATH_INFO`. If the `query_string` is not defined
212 and there is a question mark in the `path` everything after
213 it is used as query string.
214 :param base_url: the base URL is a URL that is used to extract the WSGI
215 URL scheme, host (server name + server port) and the
216 script root (`SCRIPT_NAME`).
217 :param query_string: an optional string or dict with URL parameters.
218 :param method: the HTTP method to use, defaults to `GET`.
219 :param input_stream: an optional input stream. Do not specify this and
220 `data`. As soon as an input stream is set you can't
221 modify :attr:`args` and :attr:`files` unless you
222 set the :attr:`input_stream` to `None` again.
223 :param content_type: The content type for the request. As of 0.5 you
224 don't have to provide this when specifying files
225 and form data via `data`.
226 :param content_length: The content length for the request. You don't
227 have to specify this when providing data via
228 `data`.
229 :param errors_stream: an optional error stream that is used for
230 `wsgi.errors`. Defaults to :data:`stderr`.
231 :param multithread: controls `wsgi.multithread`. Defaults to `False`.
232 :param multiprocess: controls `wsgi.multiprocess`. Defaults to `False`.
233 :param run_once: controls `wsgi.run_once`. Defaults to `False`.
234 :param headers: an optional list or :class:`Headers` object of headers.
235 :param data: a string or dict of form data or a file-object.
236 See explanation above.
237 :param json: An object to be serialized and assigned to ``data``.
238 Defaults the content type to ``"application/json"``.
239 Serialized with the function assigned to :attr:`json_dumps`.
240 :param environ_base: an optional dict of environment defaults.
241 :param environ_overrides: an optional dict of environment overrides.
242 :param auth: An authorization object to use for the
243 ``Authorization`` header value. A ``(username, password)`` tuple
244 is a shortcut for ``Basic`` authorization.
246 .. versionchanged:: 3.0
247 The ``charset`` parameter was removed.
249 .. versionchanged:: 2.1
250 ``CONTENT_TYPE`` and ``CONTENT_LENGTH`` are not duplicated as
251 header keys in the environ.
253 .. versionchanged:: 2.0
254 ``REQUEST_URI`` and ``RAW_URI`` is the full raw URI including
255 the query string, not only the path.
257 .. versionchanged:: 2.0
258 The default :attr:`request_class` is ``Request`` instead of
259 ``BaseRequest``.
261 .. versionadded:: 2.0
262 Added the ``auth`` parameter.
264 .. versionadded:: 0.15
265 The ``json`` param and :meth:`json_dumps` method.
267 .. versionadded:: 0.15
268 The environ has keys ``REQUEST_URI`` and ``RAW_URI`` containing
269 the path before percent-decoding. This is not part of the WSGI
270 PEP, but many WSGI servers include it.
272 .. versionchanged:: 0.6
273 ``path`` and ``base_url`` can now be unicode strings that are
274 encoded with :func:`iri_to_uri`.
275 """
277 #: the server protocol to use. defaults to HTTP/1.1
278 server_protocol = "HTTP/1.1"
280 #: the wsgi version to use. defaults to (1, 0)
281 wsgi_version = (1, 0)
283 #: The default request class used by :meth:`get_request`.
284 request_class = Request
286 import json
288 #: The serialization function used when ``json`` is passed.
289 json_dumps = staticmethod(json.dumps)
290 del json
292 _args: MultiDict[str, str] | None
293 _query_string: str | None
294 _input_stream: t.IO[bytes] | None
295 _form: MultiDict[str, str] | None
296 _files: FileMultiDict | None
298 def __init__(
299 self,
300 path: str = "/",
301 base_url: str | None = None,
302 query_string: t.Mapping[str, str] | str | None = None,
303 method: str = "GET",
304 input_stream: t.IO[bytes] | None = None,
305 content_type: str | None = None,
306 content_length: int | None = None,
307 errors_stream: t.IO[str] | None = None,
308 multithread: bool = False,
309 multiprocess: bool = False,
310 run_once: bool = False,
311 headers: Headers | t.Iterable[tuple[str, str]] | None = None,
312 data: None | (t.IO[bytes] | str | bytes | t.Mapping[str, t.Any]) = None,
313 environ_base: t.Mapping[str, t.Any] | None = None,
314 environ_overrides: t.Mapping[str, t.Any] | None = None,
315 mimetype: str | None = None,
316 json: t.Mapping[str, t.Any] | None = None,
317 auth: Authorization | tuple[str, str] | None = None,
318 ) -> None:
319 if query_string is not None and "?" in path:
320 raise ValueError("Query string is defined in the path and as an argument")
321 request_uri = urlsplit(path)
322 if query_string is None and "?" in path:
323 query_string = request_uri.query
325 self.path = iri_to_uri(request_uri.path)
326 self.request_uri = path
327 if base_url is not None:
328 base_url = iri_to_uri(base_url)
329 self.base_url = base_url # type: ignore
330 if isinstance(query_string, str):
331 self.query_string = query_string
332 else:
333 if query_string is None:
334 query_string = MultiDict()
335 elif not isinstance(query_string, MultiDict):
336 query_string = MultiDict(query_string)
337 self.args = query_string
338 self.method = method
339 if headers is None:
340 headers = Headers()
341 elif not isinstance(headers, Headers):
342 headers = Headers(headers)
343 self.headers = headers
344 if content_type is not None:
345 self.content_type = content_type
346 if errors_stream is None:
347 errors_stream = sys.stderr
348 self.errors_stream = errors_stream
349 self.multithread = multithread
350 self.multiprocess = multiprocess
351 self.run_once = run_once
352 self.environ_base = environ_base
353 self.environ_overrides = environ_overrides
354 self.input_stream = input_stream
355 self.content_length = content_length
356 self.closed = False
358 if auth is not None:
359 if isinstance(auth, tuple):
360 auth = Authorization(
361 "basic", {"username": auth[0], "password": auth[1]}
362 )
364 self.headers.set("Authorization", auth.to_header())
366 if json is not None:
367 if data is not None:
368 raise TypeError("can't provide both json and data")
370 data = self.json_dumps(json)
372 if self.content_type is None:
373 self.content_type = "application/json"
375 if data:
376 if input_stream is not None:
377 raise TypeError("can't provide input stream and data")
378 if hasattr(data, "read"):
379 data = data.read()
380 if isinstance(data, str):
381 data = data.encode()
382 if isinstance(data, bytes):
383 self.input_stream = BytesIO(data)
384 if self.content_length is None:
385 self.content_length = len(data)
386 else:
387 for key, value in _iter_data(data):
388 if isinstance(value, (tuple, dict)) or hasattr(value, "read"):
389 self._add_file_from_data(key, value)
390 else:
391 self.form.setlistdefault(key).append(value)
393 if mimetype is not None:
394 self.mimetype = mimetype
396 @classmethod
397 def from_environ(cls, environ: WSGIEnvironment, **kwargs: t.Any) -> EnvironBuilder:
398 """Turn an environ dict back into a builder. Any extra kwargs
399 override the args extracted from the environ.
401 .. versionchanged:: 2.0
402 Path and query values are passed through the WSGI decoding
403 dance to avoid double encoding.
405 .. versionadded:: 0.15
406 """
407 headers = Headers(EnvironHeaders(environ))
408 out = {
409 "path": _wsgi_decoding_dance(environ["PATH_INFO"]),
410 "base_url": cls._make_base_url(
411 environ["wsgi.url_scheme"],
412 headers.pop("Host"),
413 _wsgi_decoding_dance(environ["SCRIPT_NAME"]),
414 ),
415 "query_string": _wsgi_decoding_dance(environ["QUERY_STRING"]),
416 "method": environ["REQUEST_METHOD"],
417 "input_stream": environ["wsgi.input"],
418 "content_type": headers.pop("Content-Type", None),
419 "content_length": headers.pop("Content-Length", None),
420 "errors_stream": environ["wsgi.errors"],
421 "multithread": environ["wsgi.multithread"],
422 "multiprocess": environ["wsgi.multiprocess"],
423 "run_once": environ["wsgi.run_once"],
424 "headers": headers,
425 }
426 out.update(kwargs)
427 return cls(**out)
429 def _add_file_from_data(
430 self,
431 key: str,
432 value: (t.IO[bytes] | tuple[t.IO[bytes], str] | tuple[t.IO[bytes], str, str]),
433 ) -> None:
434 """Called in the EnvironBuilder to add files from the data dict."""
435 if isinstance(value, tuple):
436 self.files.add_file(key, *value)
437 else:
438 self.files.add_file(key, value)
440 @staticmethod
441 def _make_base_url(scheme: str, host: str, script_root: str) -> str:
442 return urlunsplit((scheme, host, script_root, "", "")).rstrip("/") + "/"
444 @property
445 def base_url(self) -> str:
446 """The base URL is used to extract the URL scheme, host name,
447 port, and root path.
448 """
449 return self._make_base_url(self.url_scheme, self.host, self.script_root)
451 @base_url.setter
452 def base_url(self, value: str | None) -> None:
453 if value is None:
454 scheme = "http"
455 netloc = "localhost"
456 script_root = ""
457 else:
458 scheme, netloc, script_root, qs, anchor = urlsplit(value)
459 if qs or anchor:
460 raise ValueError("base url must not contain a query string or fragment")
461 self.script_root = script_root.rstrip("/")
462 self.host = netloc
463 self.url_scheme = scheme
465 @property
466 def content_type(self) -> str | None:
467 """The content type for the request. Reflected from and to
468 the :attr:`headers`. Do not set if you set :attr:`files` or
469 :attr:`form` for auto detection.
470 """
471 ct = self.headers.get("Content-Type")
472 if ct is None and not self._input_stream:
473 if self._files:
474 return "multipart/form-data"
475 if self._form:
476 return "application/x-www-form-urlencoded"
477 return None
478 return ct
480 @content_type.setter
481 def content_type(self, value: str | None) -> None:
482 if value is None:
483 self.headers.pop("Content-Type", None)
484 else:
485 self.headers["Content-Type"] = value
487 @property
488 def mimetype(self) -> str | None:
489 """The mimetype (content type without charset etc.)
491 .. versionadded:: 0.14
492 """
493 ct = self.content_type
494 return ct.split(";")[0].strip() if ct else None
496 @mimetype.setter
497 def mimetype(self, value: str) -> None:
498 self.content_type = get_content_type(value, "utf-8")
500 @property
501 def mimetype_params(self) -> t.Mapping[str, str]:
502 """The mimetype parameters as dict. For example if the
503 content type is ``text/html; charset=utf-8`` the params would be
504 ``{'charset': 'utf-8'}``.
506 .. versionadded:: 0.14
507 """
509 def on_update(d: CallbackDict[str, str]) -> None:
510 self.headers["Content-Type"] = dump_options_header(self.mimetype, d)
512 d = parse_options_header(self.headers.get("content-type", ""))[1]
513 return CallbackDict(d, on_update)
515 @property
516 def content_length(self) -> int | None:
517 """The content length as integer. Reflected from and to the
518 :attr:`headers`. Do not set if you set :attr:`files` or
519 :attr:`form` for auto detection.
520 """
521 return self.headers.get("Content-Length", type=int)
523 @content_length.setter
524 def content_length(self, value: int | None) -> None:
525 if value is None:
526 self.headers.pop("Content-Length", None)
527 else:
528 self.headers["Content-Length"] = str(value)
530 def _get_form(self, name: str, storage: type[_TAnyMultiDict]) -> _TAnyMultiDict:
531 """Common behavior for getting the :attr:`form` and
532 :attr:`files` properties.
534 :param name: Name of the internal cached attribute.
535 :param storage: Storage class used for the data.
536 """
537 if self.input_stream is not None:
538 raise AttributeError("an input stream is defined")
540 rv = getattr(self, name)
542 if rv is None:
543 rv = storage()
544 setattr(self, name, rv)
546 return rv # type: ignore
548 def _set_form(self, name: str, value: MultiDict[str, t.Any]) -> None:
549 """Common behavior for setting the :attr:`form` and
550 :attr:`files` properties.
552 :param name: Name of the internal cached attribute.
553 :param value: Value to assign to the attribute.
554 """
555 self._input_stream = None
556 setattr(self, name, value)
558 @property
559 def form(self) -> MultiDict[str, str]:
560 """A :class:`MultiDict` of form values."""
561 return self._get_form("_form", MultiDict)
563 @form.setter
564 def form(self, value: MultiDict[str, str]) -> None:
565 self._set_form("_form", value)
567 @property
568 def files(self) -> FileMultiDict:
569 """A :class:`FileMultiDict` of uploaded files. Use
570 :meth:`~FileMultiDict.add_file` to add new files.
571 """
572 return self._get_form("_files", FileMultiDict)
574 @files.setter
575 def files(self, value: FileMultiDict) -> None:
576 self._set_form("_files", value)
578 @property
579 def input_stream(self) -> t.IO[bytes] | None:
580 """An optional input stream. This is mutually exclusive with
581 setting :attr:`form` and :attr:`files`, setting it will clear
582 those. Do not provide this if the method is not ``POST`` or
583 another method that has a body.
584 """
585 return self._input_stream
587 @input_stream.setter
588 def input_stream(self, value: t.IO[bytes] | None) -> None:
589 self._input_stream = value
590 self._form = None
591 self._files = None
593 @property
594 def query_string(self) -> str:
595 """The query string. If you set this to a string
596 :attr:`args` will no longer be available.
597 """
598 if self._query_string is None:
599 if self._args is not None:
600 return _urlencode(self._args)
601 return ""
602 return self._query_string
604 @query_string.setter
605 def query_string(self, value: str | None) -> None:
606 self._query_string = value
607 self._args = None
609 @property
610 def args(self) -> MultiDict[str, str]:
611 """The URL arguments as :class:`MultiDict`."""
612 if self._query_string is not None:
613 raise AttributeError("a query string is defined")
614 if self._args is None:
615 self._args = MultiDict()
616 return self._args
618 @args.setter
619 def args(self, value: MultiDict[str, str] | None) -> None:
620 self._query_string = None
621 self._args = value
623 @property
624 def server_name(self) -> str:
625 """The server name (read-only, use :attr:`host` to set)"""
626 return self.host.split(":", 1)[0]
628 @property
629 def server_port(self) -> int:
630 """The server port as integer (read-only, use :attr:`host` to set)"""
631 pieces = self.host.split(":", 1)
633 if len(pieces) == 2:
634 try:
635 return int(pieces[1])
636 except ValueError:
637 pass
639 if self.url_scheme == "https":
640 return 443
641 return 80
643 def __del__(self) -> None:
644 try:
645 self.close()
646 except Exception:
647 pass
649 def close(self) -> None:
650 """Closes all files. If you put real :class:`file` objects into the
651 :attr:`files` dict you can call this method to automatically close
652 them all in one go.
653 """
654 if self.closed:
655 return
656 try:
657 files = self.files.values()
658 except AttributeError:
659 files = ()
660 for f in files:
661 try:
662 f.close()
663 except Exception:
664 pass
665 self.closed = True
667 def get_environ(self) -> WSGIEnvironment:
668 """Return the built environ.
670 .. versionchanged:: 0.15
671 The content type and length headers are set based on
672 input stream detection. Previously this only set the WSGI
673 keys.
674 """
675 input_stream = self.input_stream
676 content_length = self.content_length
678 mimetype = self.mimetype
679 content_type = self.content_type
681 if input_stream is not None:
682 start_pos = input_stream.tell()
683 input_stream.seek(0, 2)
684 end_pos = input_stream.tell()
685 input_stream.seek(start_pos)
686 content_length = end_pos - start_pos
687 elif mimetype == "multipart/form-data":
688 input_stream, content_length, boundary = stream_encode_multipart(
689 CombinedMultiDict([self.form, self.files])
690 )
691 content_type = f'{mimetype}; boundary="{boundary}"'
692 elif mimetype == "application/x-www-form-urlencoded":
693 form_encoded = _urlencode(self.form).encode("ascii")
694 content_length = len(form_encoded)
695 input_stream = BytesIO(form_encoded)
696 else:
697 input_stream = BytesIO()
699 result: WSGIEnvironment = {}
700 if self.environ_base:
701 result.update(self.environ_base)
703 def _path_encode(x: str) -> str:
704 return _wsgi_encoding_dance(unquote(x))
706 raw_uri = _wsgi_encoding_dance(self.request_uri)
707 result.update(
708 {
709 "REQUEST_METHOD": self.method,
710 "SCRIPT_NAME": _path_encode(self.script_root),
711 "PATH_INFO": _path_encode(self.path),
712 "QUERY_STRING": _wsgi_encoding_dance(self.query_string),
713 # Non-standard, added by mod_wsgi, uWSGI
714 "REQUEST_URI": raw_uri,
715 # Non-standard, added by gunicorn
716 "RAW_URI": raw_uri,
717 "SERVER_NAME": self.server_name,
718 "SERVER_PORT": str(self.server_port),
719 "HTTP_HOST": self.host,
720 "SERVER_PROTOCOL": self.server_protocol,
721 "wsgi.version": self.wsgi_version,
722 "wsgi.url_scheme": self.url_scheme,
723 "wsgi.input": input_stream,
724 "wsgi.errors": self.errors_stream,
725 "wsgi.multithread": self.multithread,
726 "wsgi.multiprocess": self.multiprocess,
727 "wsgi.run_once": self.run_once,
728 }
729 )
731 headers = self.headers.copy()
732 # Don't send these as headers, they're part of the environ.
733 headers.remove("Content-Type")
734 headers.remove("Content-Length")
736 if content_type is not None:
737 result["CONTENT_TYPE"] = content_type
739 if content_length is not None:
740 result["CONTENT_LENGTH"] = str(content_length)
742 combined_headers = defaultdict(list)
744 for key, value in headers.to_wsgi_list():
745 combined_headers[f"HTTP_{key.upper().replace('-', '_')}"].append(value)
747 for key, values in combined_headers.items():
748 result[key] = ", ".join(values)
750 if self.environ_overrides:
751 result.update(self.environ_overrides)
753 return result
755 def get_request(self, cls: type[Request] | None = None) -> Request:
756 """Returns a request with the data. If the request class is not
757 specified :attr:`request_class` is used.
759 :param cls: The request wrapper to use.
760 """
761 if cls is None:
762 cls = self.request_class
764 return cls(self.get_environ())
767class ClientRedirectError(Exception):
768 """If a redirect loop is detected when using follow_redirects=True with
769 the :cls:`Client`, then this exception is raised.
770 """
773class Client:
774 """Simulate sending requests to a WSGI application without running a WSGI or HTTP
775 server.
777 :param application: The WSGI application to make requests to.
778 :param response_wrapper: A :class:`.Response` class to wrap response data with.
779 Defaults to :class:`.TestResponse`. If it's not a subclass of ``TestResponse``,
780 one will be created.
781 :param use_cookies: Persist cookies from ``Set-Cookie`` response headers to the
782 ``Cookie`` header in subsequent requests. Domain and path matching is supported,
783 but other cookie parameters are ignored.
784 :param allow_subdomain_redirects: Allow requests to follow redirects to subdomains.
785 Enable this if the application handles subdomains and redirects between them.
787 .. versionchanged:: 2.3
788 Simplify cookie implementation, support domain and path matching.
790 .. versionchanged:: 2.1
791 All data is available as properties on the returned response object. The
792 response cannot be returned as a tuple.
794 .. versionchanged:: 2.0
795 ``response_wrapper`` is always a subclass of :class:``TestResponse``.
797 .. versionchanged:: 0.5
798 Added the ``use_cookies`` parameter.
799 """
801 def __init__(
802 self,
803 application: WSGIApplication,
804 response_wrapper: type[Response] | None = None,
805 use_cookies: bool = True,
806 allow_subdomain_redirects: bool = False,
807 ) -> None:
808 self.application = application
810 if response_wrapper in {None, Response}:
811 response_wrapper = TestResponse
812 elif response_wrapper is not None and not issubclass(
813 response_wrapper, TestResponse
814 ):
815 response_wrapper = type(
816 "WrapperTestResponse",
817 (TestResponse, response_wrapper),
818 {},
819 )
821 self.response_wrapper = t.cast(type["TestResponse"], response_wrapper)
823 if use_cookies:
824 self._cookies: dict[tuple[str, str, str], Cookie] | None = {}
825 else:
826 self._cookies = None
828 self.allow_subdomain_redirects = allow_subdomain_redirects
830 def get_cookie(
831 self, key: str, domain: str = "localhost", path: str = "/"
832 ) -> Cookie | None:
833 """Return a :class:`.Cookie` if it exists. Cookies are uniquely identified by
834 ``(domain, path, key)``.
836 :param key: The decoded form of the key for the cookie.
837 :param domain: The domain the cookie was set for.
838 :param path: The path the cookie was set for.
840 .. versionadded:: 2.3
841 """
842 if self._cookies is None:
843 raise TypeError(
844 "Cookies are disabled. Create a client with 'use_cookies=True'."
845 )
847 return self._cookies.get((domain, path, key))
849 def set_cookie(
850 self,
851 key: str,
852 value: str = "",
853 *,
854 domain: str = "localhost",
855 origin_only: bool = True,
856 path: str = "/",
857 **kwargs: t.Any,
858 ) -> None:
859 """Set a cookie to be sent in subsequent requests.
861 This is a convenience to skip making a test request to a route that would set
862 the cookie. To test the cookie, make a test request to a route that uses the
863 cookie value.
865 The client uses ``domain``, ``origin_only``, and ``path`` to determine which
866 cookies to send with a request. It does not use other cookie parameters that
867 browsers use, since they're not applicable in tests.
869 :param key: The key part of the cookie.
870 :param value: The value part of the cookie.
871 :param domain: Send this cookie with requests that match this domain. If
872 ``origin_only`` is true, it must be an exact match, otherwise it may be a
873 suffix match.
874 :param origin_only: Whether the domain must be an exact match to the request.
875 :param path: Send this cookie with requests that match this path either exactly
876 or as a prefix.
877 :param kwargs: Passed to :func:`.dump_cookie`.
879 .. versionchanged:: 3.0
880 The parameter ``server_name`` is removed. The first parameter is
881 ``key``. Use the ``domain`` and ``origin_only`` parameters instead.
883 .. versionchanged:: 2.3
884 The ``origin_only`` parameter was added.
886 .. versionchanged:: 2.3
887 The ``domain`` parameter defaults to ``localhost``.
888 """
889 if self._cookies is None:
890 raise TypeError(
891 "Cookies are disabled. Create a client with 'use_cookies=True'."
892 )
894 cookie = Cookie._from_response_header(
895 domain, "/", dump_cookie(key, value, domain=domain, path=path, **kwargs)
896 )
897 cookie.origin_only = origin_only
899 if cookie._should_delete:
900 self._cookies.pop(cookie._storage_key, None)
901 else:
902 self._cookies[cookie._storage_key] = cookie
904 def delete_cookie(
905 self,
906 key: str,
907 *,
908 domain: str = "localhost",
909 path: str = "/",
910 ) -> None:
911 """Delete a cookie if it exists. Cookies are uniquely identified by
912 ``(domain, path, key)``.
914 :param key: The decoded form of the key for the cookie.
915 :param domain: The domain the cookie was set for.
916 :param path: The path the cookie was set for.
918 .. versionchanged:: 3.0
919 The ``server_name`` parameter is removed. The first parameter is
920 ``key``. Use the ``domain`` parameter instead.
922 .. versionchanged:: 3.0
923 The ``secure``, ``httponly`` and ``samesite`` parameters are removed.
925 .. versionchanged:: 2.3
926 The ``domain`` parameter defaults to ``localhost``.
927 """
928 if self._cookies is None:
929 raise TypeError(
930 "Cookies are disabled. Create a client with 'use_cookies=True'."
931 )
933 self._cookies.pop((domain, path, key), None)
935 def _add_cookies_to_wsgi(self, environ: WSGIEnvironment) -> None:
936 """If cookies are enabled, set the ``Cookie`` header in the environ to the
937 cookies that are applicable to the request host and path.
939 :meta private:
941 .. versionadded:: 2.3
942 """
943 if self._cookies is None:
944 return
946 url = urlsplit(get_current_url(environ))
947 server_name = url.hostname or "localhost"
948 value = "; ".join(
949 c._to_request_header()
950 for c in self._cookies.values()
951 if c._matches_request(server_name, url.path)
952 )
954 if value:
955 environ["HTTP_COOKIE"] = value
956 else:
957 environ.pop("HTTP_COOKIE", None)
959 def _update_cookies_from_response(
960 self, server_name: str, path: str, headers: list[str]
961 ) -> None:
962 """If cookies are enabled, update the stored cookies from any ``Set-Cookie``
963 headers in the response.
965 :meta private:
967 .. versionadded:: 2.3
968 """
969 if self._cookies is None:
970 return
972 for header in headers:
973 cookie = Cookie._from_response_header(server_name, path, header)
975 if cookie._should_delete:
976 self._cookies.pop(cookie._storage_key, None)
977 else:
978 self._cookies[cookie._storage_key] = cookie
980 def run_wsgi_app(
981 self, environ: WSGIEnvironment, buffered: bool = False
982 ) -> tuple[t.Iterable[bytes], str, Headers]:
983 """Runs the wrapped WSGI app with the given environment.
985 :meta private:
986 """
987 self._add_cookies_to_wsgi(environ)
988 rv = run_wsgi_app(self.application, environ, buffered=buffered)
989 url = urlsplit(get_current_url(environ))
990 self._update_cookies_from_response(
991 url.hostname or "localhost", url.path, rv[2].getlist("Set-Cookie")
992 )
993 return rv
995 def resolve_redirect(
996 self, response: TestResponse, buffered: bool = False
997 ) -> TestResponse:
998 """Perform a new request to the location given by the redirect
999 response to the previous request.
1001 :meta private:
1002 """
1003 scheme, netloc, path, qs, anchor = urlsplit(response.location)
1004 builder = EnvironBuilder.from_environ(
1005 response.request.environ, path=path, query_string=qs
1006 )
1008 to_name_parts = netloc.split(":", 1)[0].split(".")
1009 from_name_parts = builder.server_name.split(".")
1011 if to_name_parts != [""]:
1012 # The new location has a host, use it for the base URL.
1013 builder.url_scheme = scheme
1014 builder.host = netloc
1015 else:
1016 # A local redirect with autocorrect_location_header=False
1017 # doesn't have a host, so use the request's host.
1018 to_name_parts = from_name_parts
1020 # Explain why a redirect to a different server name won't be followed.
1021 if to_name_parts != from_name_parts:
1022 if to_name_parts[-len(from_name_parts) :] == from_name_parts:
1023 if not self.allow_subdomain_redirects:
1024 raise RuntimeError("Following subdomain redirects is not enabled.")
1025 else:
1026 raise RuntimeError("Following external redirects is not supported.")
1028 path_parts = path.split("/")
1029 root_parts = builder.script_root.split("/")
1031 if path_parts[: len(root_parts)] == root_parts:
1032 # Strip the script root from the path.
1033 builder.path = path[len(builder.script_root) :]
1034 else:
1035 # The new location is not under the script root, so use the
1036 # whole path and clear the previous root.
1037 builder.path = path
1038 builder.script_root = ""
1040 # Only 307 and 308 preserve all of the original request.
1041 if response.status_code not in {307, 308}:
1042 # HEAD is preserved, everything else becomes GET.
1043 if builder.method != "HEAD":
1044 builder.method = "GET"
1046 # Clear the body and the headers that describe it.
1048 if builder.input_stream is not None:
1049 builder.input_stream.close()
1050 builder.input_stream = None
1052 builder.content_type = None
1053 builder.content_length = None
1054 builder.headers.pop("Transfer-Encoding", None)
1056 return self.open(builder, buffered=buffered)
1058 def open(
1059 self,
1060 *args: t.Any,
1061 buffered: bool = False,
1062 follow_redirects: bool = False,
1063 **kwargs: t.Any,
1064 ) -> TestResponse:
1065 """Generate an environ dict from the given arguments, make a
1066 request to the application using it, and return the response.
1068 :param args: Passed to :class:`EnvironBuilder` to create the
1069 environ for the request. If a single arg is passed, it can
1070 be an existing :class:`EnvironBuilder` or an environ dict.
1071 :param buffered: Convert the iterator returned by the app into
1072 a list. If the iterator has a ``close()`` method, it is
1073 called automatically.
1074 :param follow_redirects: Make additional requests to follow HTTP
1075 redirects until a non-redirect status is returned.
1076 :attr:`TestResponse.history` lists the intermediate
1077 responses.
1079 .. versionchanged:: 2.1
1080 Removed the ``as_tuple`` parameter.
1082 .. versionchanged:: 2.0
1083 The request input stream is closed when calling
1084 ``response.close()``. Input streams for redirects are
1085 automatically closed.
1087 .. versionchanged:: 0.5
1088 If a dict is provided as file in the dict for the ``data``
1089 parameter the content type has to be called ``content_type``
1090 instead of ``mimetype``. This change was made for
1091 consistency with :class:`werkzeug.FileWrapper`.
1093 .. versionchanged:: 0.5
1094 Added the ``follow_redirects`` parameter.
1095 """
1096 request: Request | None = None
1098 if not kwargs and len(args) == 1:
1099 arg = args[0]
1101 if isinstance(arg, EnvironBuilder):
1102 request = arg.get_request()
1103 elif isinstance(arg, dict):
1104 request = EnvironBuilder.from_environ(arg).get_request()
1105 elif isinstance(arg, Request):
1106 request = arg
1108 if request is None:
1109 builder = EnvironBuilder(*args, **kwargs)
1111 try:
1112 request = builder.get_request()
1113 finally:
1114 builder.close()
1116 response_parts = self.run_wsgi_app(request.environ, buffered=buffered)
1117 response = self.response_wrapper(*response_parts, request=request)
1119 redirects = set()
1120 history: list[TestResponse] = []
1122 if not follow_redirects:
1123 return response
1125 while response.status_code in {
1126 301,
1127 302,
1128 303,
1129 305,
1130 307,
1131 308,
1132 }:
1133 # Exhaust intermediate response bodies to ensure middleware
1134 # that returns an iterator runs any cleanup code.
1135 if not buffered:
1136 response.make_sequence()
1137 response.close()
1139 new_redirect_entry = (response.location, response.status_code)
1141 if new_redirect_entry in redirects:
1142 raise ClientRedirectError(
1143 f"Loop detected: A {response.status_code} redirect"
1144 f" to {response.location} was already made."
1145 )
1147 redirects.add(new_redirect_entry)
1148 response.history = tuple(history)
1149 history.append(response)
1150 response = self.resolve_redirect(response, buffered=buffered)
1151 else:
1152 # This is the final request after redirects.
1153 response.history = tuple(history)
1154 # Close the input stream when closing the response, in case
1155 # the input is an open temporary file.
1156 response.call_on_close(request.input_stream.close)
1157 return response
1159 def get(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1160 """Call :meth:`open` with ``method`` set to ``GET``."""
1161 kw["method"] = "GET"
1162 return self.open(*args, **kw)
1164 def post(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1165 """Call :meth:`open` with ``method`` set to ``POST``."""
1166 kw["method"] = "POST"
1167 return self.open(*args, **kw)
1169 def put(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1170 """Call :meth:`open` with ``method`` set to ``PUT``."""
1171 kw["method"] = "PUT"
1172 return self.open(*args, **kw)
1174 def delete(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1175 """Call :meth:`open` with ``method`` set to ``DELETE``."""
1176 kw["method"] = "DELETE"
1177 return self.open(*args, **kw)
1179 def patch(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1180 """Call :meth:`open` with ``method`` set to ``PATCH``."""
1181 kw["method"] = "PATCH"
1182 return self.open(*args, **kw)
1184 def options(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1185 """Call :meth:`open` with ``method`` set to ``OPTIONS``."""
1186 kw["method"] = "OPTIONS"
1187 return self.open(*args, **kw)
1189 def head(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1190 """Call :meth:`open` with ``method`` set to ``HEAD``."""
1191 kw["method"] = "HEAD"
1192 return self.open(*args, **kw)
1194 def trace(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1195 """Call :meth:`open` with ``method`` set to ``TRACE``."""
1196 kw["method"] = "TRACE"
1197 return self.open(*args, **kw)
1199 def __repr__(self) -> str:
1200 return f"<{type(self).__name__} {self.application!r}>"
1203def create_environ(*args: t.Any, **kwargs: t.Any) -> WSGIEnvironment:
1204 """Create a new WSGI environ dict based on the values passed. The first
1205 parameter should be the path of the request which defaults to '/'. The
1206 second one can either be an absolute path (in that case the host is
1207 localhost:80) or a full path to the request with scheme, netloc port and
1208 the path to the script.
1210 This accepts the same arguments as the :class:`EnvironBuilder`
1211 constructor.
1213 .. versionchanged:: 0.5
1214 This function is now a thin wrapper over :class:`EnvironBuilder` which
1215 was added in 0.5. The `headers`, `environ_base`, `environ_overrides`
1216 and `charset` parameters were added.
1217 """
1218 builder = EnvironBuilder(*args, **kwargs)
1220 try:
1221 return builder.get_environ()
1222 finally:
1223 builder.close()
1226def run_wsgi_app(
1227 app: WSGIApplication, environ: WSGIEnvironment, buffered: bool = False
1228) -> tuple[t.Iterable[bytes], str, Headers]:
1229 """Return a tuple in the form (app_iter, status, headers) of the
1230 application output. This works best if you pass it an application that
1231 returns an iterator all the time.
1233 Sometimes applications may use the `write()` callable returned
1234 by the `start_response` function. This tries to resolve such edge
1235 cases automatically. But if you don't get the expected output you
1236 should set `buffered` to `True` which enforces buffering.
1238 If passed an invalid WSGI application the behavior of this function is
1239 undefined. Never pass non-conforming WSGI applications to this function.
1241 :param app: the application to execute.
1242 :param buffered: set to `True` to enforce buffering.
1243 :return: tuple in the form ``(app_iter, status, headers)``
1244 """
1245 # Copy environ to ensure any mutations by the app (ProxyFix, for
1246 # example) don't affect subsequent requests (such as redirects).
1247 environ = _get_environ(environ).copy()
1248 status: str
1249 response: tuple[str, list[tuple[str, str]]] | None = None
1250 buffer: list[bytes] = []
1252 def start_response(status, headers, exc_info=None): # type: ignore
1253 nonlocal response
1255 if exc_info:
1256 try:
1257 raise exc_info[1].with_traceback(exc_info[2])
1258 finally:
1259 exc_info = None
1261 response = (status, headers)
1262 return buffer.append
1264 app_rv = app(environ, start_response)
1265 close_func = getattr(app_rv, "close", None)
1266 app_iter: t.Iterable[bytes] = iter(app_rv)
1268 # when buffering we emit the close call early and convert the
1269 # application iterator into a regular list
1270 if buffered:
1271 try:
1272 app_iter = list(app_iter)
1273 finally:
1274 if close_func is not None:
1275 close_func()
1277 # otherwise we iterate the application iter until we have a response, chain
1278 # the already received data with the already collected data and wrap it in
1279 # a new `ClosingIterator` if we need to restore a `close` callable from the
1280 # original return value.
1281 else:
1282 for item in app_iter:
1283 buffer.append(item)
1285 if response is not None:
1286 break
1288 if buffer:
1289 app_iter = chain(buffer, app_iter)
1291 if close_func is not None and app_iter is not app_rv:
1292 app_iter = ClosingIterator(app_iter, close_func)
1294 status, headers = response # type: ignore
1295 return app_iter, status, Headers(headers)
1298class TestResponse(Response):
1299 """:class:`~werkzeug.wrappers.Response` subclass that provides extra
1300 information about requests made with the test :class:`Client`.
1302 Test client requests will always return an instance of this class.
1303 If a custom response class is passed to the client, it is
1304 subclassed along with this to support test information.
1306 If the test request included large files, or if the application is
1307 serving a file, call :meth:`close` to close any open files and
1308 prevent Python showing a ``ResourceWarning``.
1310 .. versionchanged:: 2.2
1311 Set the ``default_mimetype`` to None to prevent a mimetype being
1312 assumed if missing.
1314 .. versionchanged:: 2.1
1315 Response instances cannot be treated as tuples.
1317 .. versionadded:: 2.0
1318 Test client methods always return instances of this class.
1319 """
1321 default_mimetype = None
1322 # Don't assume a mimetype, instead use whatever the response provides
1324 request: Request
1325 """A request object with the environ used to make the request that
1326 resulted in this response.
1327 """
1329 history: tuple[TestResponse, ...]
1330 """A list of intermediate responses. Populated when the test request
1331 is made with ``follow_redirects`` enabled.
1332 """
1334 # Tell Pytest to ignore this, it's not a test class.
1335 __test__ = False
1337 def __init__(
1338 self,
1339 response: t.Iterable[bytes],
1340 status: str,
1341 headers: Headers,
1342 request: Request,
1343 history: tuple[TestResponse] = (), # type: ignore
1344 **kwargs: t.Any,
1345 ) -> None:
1346 super().__init__(response, status, headers, **kwargs)
1347 self.request = request
1348 self.history = history
1349 self._compat_tuple = response, status, headers
1351 @cached_property
1352 def text(self) -> str:
1353 """The response data as text. A shortcut for
1354 ``response.get_data(as_text=True)``.
1356 .. versionadded:: 2.1
1357 """
1358 return self.get_data(as_text=True)
1361@dataclasses.dataclass
1362class Cookie:
1363 """A cookie key, value, and parameters.
1365 The class itself is not a public API. Its attributes are documented for inspection
1366 with :meth:`.Client.get_cookie` only.
1368 .. versionadded:: 2.3
1369 """
1371 key: str
1372 """The cookie key, encoded as a client would see it."""
1374 value: str
1375 """The cookie key, encoded as a client would see it."""
1377 decoded_key: str
1378 """The cookie key, decoded as the application would set and see it."""
1380 decoded_value: str
1381 """The cookie value, decoded as the application would set and see it."""
1383 expires: datetime | None
1384 """The time at which the cookie is no longer valid."""
1386 max_age: int | None
1387 """The number of seconds from when the cookie was set at which it is
1388 no longer valid.
1389 """
1391 domain: str
1392 """The domain that the cookie was set for, or the request domain if not set."""
1394 origin_only: bool
1395 """Whether the cookie will be sent for exact domain matches only. This is ``True``
1396 if the ``Domain`` parameter was not present.
1397 """
1399 path: str
1400 """The path that the cookie was set for."""
1402 secure: bool | None
1403 """The ``Secure`` parameter."""
1405 http_only: bool | None
1406 """The ``HttpOnly`` parameter."""
1408 same_site: str | None
1409 """The ``SameSite`` parameter."""
1411 def _matches_request(self, server_name: str, path: str) -> bool:
1412 return (
1413 server_name == self.domain
1414 or (
1415 not self.origin_only
1416 and server_name.endswith(self.domain)
1417 and server_name[: -len(self.domain)].endswith(".")
1418 )
1419 ) and (
1420 path == self.path
1421 or (
1422 path.startswith(self.path)
1423 and path[len(self.path) - self.path.endswith("/") :].startswith("/")
1424 )
1425 )
1427 def _to_request_header(self) -> str:
1428 return f"{self.key}={self.value}"
1430 @classmethod
1431 def _from_response_header(cls, server_name: str, path: str, header: str) -> te.Self:
1432 header, _, parameters_str = header.partition(";")
1433 key, _, value = header.partition("=")
1434 decoded_key, decoded_value = next(parse_cookie(header).items()) # type: ignore[call-overload]
1435 params = {}
1437 for item in parameters_str.split(";"):
1438 k, sep, v = item.partition("=")
1439 params[k.strip().lower()] = v.strip() if sep else None
1441 return cls(
1442 key=key.strip(),
1443 value=value.strip(),
1444 decoded_key=decoded_key,
1445 decoded_value=decoded_value,
1446 expires=parse_date(params.get("expires")),
1447 max_age=int(params["max-age"] or 0) if "max-age" in params else None,
1448 domain=params.get("domain") or server_name,
1449 origin_only="domain" not in params,
1450 path=params.get("path") or path.rpartition("/")[0] or "/",
1451 secure="secure" in params,
1452 http_only="httponly" in params,
1453 same_site=params.get("samesite"),
1454 )
1456 @property
1457 def _storage_key(self) -> tuple[str, str, str]:
1458 return self.domain, self.path, self.decoded_key
1460 @property
1461 def _should_delete(self) -> bool:
1462 return self.max_age == 0 or (
1463 self.expires is not None and self.expires.timestamp() == 0
1464 )