Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/test.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import dataclasses
4import json
5import mimetypes
6import sys
7import typing as t
8from collections import defaultdict
9from datetime import datetime
10from io import BytesIO
11from itertools import chain
12from random import random
13from tempfile import TemporaryFile
14from time import time
15from types import TracebackType
16from urllib.parse import unquote
17from urllib.parse import urlsplit
18from urllib.parse import urlunsplit
20from ._internal import _get_environ
21from ._internal import _wsgi_decoding_dance
22from ._internal import _wsgi_encoding_dance
23from .datastructures import Authorization
24from .datastructures import CallbackDict
25from .datastructures import CombinedMultiDict
26from .datastructures import EnvironHeaders
27from .datastructures import FileMultiDict
28from .datastructures import Headers
29from .datastructures import MultiDict
30from .http import dump_cookie
31from .http import dump_options_header
32from .http import parse_cookie
33from .http import parse_date
34from .http import parse_options_header
35from .sansio.multipart import Data
36from .sansio.multipart import Epilogue
37from .sansio.multipart import Field
38from .sansio.multipart import File
39from .sansio.multipart import MultipartEncoder
40from .sansio.multipart import Preamble
41from .urls import _urlencode
42from .urls import iri_to_uri
43from .utils import cached_property
44from .utils import get_content_type
45from .wrappers.request import Request
46from .wrappers.response import Response
47from .wsgi import ClosingIterator
48from .wsgi import get_current_url
50if t.TYPE_CHECKING:
51 import typing_extensions as te
52 from _typeshed.wsgi import WSGIApplication
53 from _typeshed.wsgi import WSGIEnvironment
56def stream_encode_multipart(
57 data: t.Mapping[str, t.Any],
58 use_tempfile: bool = True,
59 threshold: int = 1024 * 500,
60 boundary: str | None = None,
61) -> tuple[t.IO[bytes], int, str]:
62 """Encode a dict of values (either strings or file descriptors or
63 :class:`FileStorage` objects.) into a multipart encoded string stored
64 in a file descriptor.
66 .. versionchanged:: 3.0
67 The ``charset`` parameter was removed.
68 """
69 if boundary is None:
70 boundary = f"---------------WerkzeugFormPart_{time()}{random()}"
72 stream: t.IO[bytes] = BytesIO()
73 total_length = 0
74 on_disk = False
75 write_binary: t.Callable[[bytes], int]
77 if use_tempfile:
79 def write_binary(s: bytes) -> int:
80 nonlocal stream, total_length, on_disk
82 if on_disk:
83 return stream.write(s)
84 else:
85 length = len(s)
87 if length + total_length <= threshold:
88 stream.write(s)
89 else:
90 new_stream = t.cast(t.IO[bytes], TemporaryFile("wb+"))
91 new_stream.write(stream.getvalue()) # type: ignore
92 new_stream.write(s)
93 stream = new_stream
94 on_disk = True
96 total_length += length
97 return length
99 else:
100 write_binary = stream.write
102 encoder = MultipartEncoder(boundary.encode())
103 write_binary(encoder.send_event(Preamble(data=b"")))
104 for key, value in _iter_data(data):
105 reader = getattr(value, "read", None)
106 if reader is not None:
107 filename = getattr(value, "filename", getattr(value, "name", None))
108 content_type = getattr(value, "content_type", None)
109 if content_type is None:
110 content_type = (
111 filename
112 and mimetypes.guess_type(filename)[0]
113 or "application/octet-stream"
114 )
115 headers = value.headers
116 headers.update([("Content-Type", content_type)])
117 if filename is None:
118 write_binary(encoder.send_event(Field(name=key, headers=headers)))
119 else:
120 write_binary(
121 encoder.send_event(
122 File(name=key, filename=filename, headers=headers)
123 )
124 )
125 while True:
126 chunk = reader(16384)
128 if not chunk:
129 write_binary(encoder.send_event(Data(data=chunk, more_data=False)))
130 break
132 write_binary(encoder.send_event(Data(data=chunk, more_data=True)))
133 else:
134 if not isinstance(value, str):
135 value = str(value)
136 write_binary(encoder.send_event(Field(name=key, headers=Headers())))
137 write_binary(encoder.send_event(Data(data=value.encode(), more_data=False)))
139 write_binary(encoder.send_event(Epilogue(data=b"")))
141 length = stream.tell()
142 stream.seek(0)
143 return stream, length, boundary
146def encode_multipart(
147 values: t.Mapping[str, t.Any], boundary: str | None = None
148) -> tuple[str, bytes]:
149 """Like `stream_encode_multipart` but returns a tuple in the form
150 (``boundary``, ``data``) where data is bytes.
152 .. versionchanged:: 3.0
153 The ``charset`` parameter was removed.
154 """
155 stream, length, boundary = stream_encode_multipart(
156 values, use_tempfile=False, boundary=boundary
157 )
158 return boundary, stream.read()
161def _iter_data(data: t.Mapping[str, t.Any]) -> t.Iterator[tuple[str, t.Any]]:
162 """Iterate over a mapping that might have a list of values, yielding
163 all key, value pairs. Almost like iter_multi_items but only allows
164 lists, not tuples, of values so tuples can be used for files.
165 """
166 if isinstance(data, MultiDict):
167 yield from data.items(multi=True)
168 else:
169 for key, value in data.items():
170 if isinstance(value, list):
171 for v in value:
172 yield key, v
173 else:
174 yield key, value
177class EnvironBuilder:
178 """This class can be used to conveniently create a WSGI environment
179 for testing purposes. It can be used to quickly create WSGI environments
180 or request objects from arbitrary data.
182 The signature of this class is also used in some other places as of
183 Werkzeug 0.5 (:func:`create_environ`, :meth:`Response.from_values`,
184 :meth:`Client.open`). Because of this most of the functionality is
185 available through the constructor alone.
187 :param path: the path of the request. In the WSGI environment this will
188 end up as `PATH_INFO`. If the `query_string` is not defined
189 and there is a question mark in the `path` everything after
190 it is used as query string.
191 :param base_url: the base URL is a URL that is used to extract the WSGI
192 URL scheme, host (server name + server port) and the
193 script root (`SCRIPT_NAME`).
194 :param query_string: A :class:`dict` or :class:`.MultiDict` to encode as the
195 query string of the URL, which sets :attr:`args`. Or a string, which
196 sets :attr:`query_string`, in which case :attr:`args` cannot be used.
197 :param method: the HTTP method to use, defaults to `GET`.
198 :param content_type: The content type for the request. As of 0.5 you
199 don't have to provide this when specifying files
200 and form data via `data`.
201 :param content_length: The content length for the request. You don't
202 have to specify this when providing data via
203 `data`.
204 :param errors_stream: an optional error stream that is used for
205 `wsgi.errors`. Defaults to :data:`stderr`.
206 :param multithread: controls `wsgi.multithread`. Defaults to `False`.
207 :param multiprocess: controls `wsgi.multiprocess`. Defaults to `False`.
208 :param run_once: controls `wsgi.run_once`. Defaults to `False`.
209 :param headers: an optional list or :class:`Headers` object of headers.
210 :param data: A dict of form and file data to encode as the body of the
211 request; file values can be an IO object, ``(stream, filename)``,
212 ``(stream, filename, content type)``, or :class:`.FileStorage`.
213 Alternatively, pass raw bytes to set as :attr:`input_stream`, or an IO
214 object to read and set. ``json`` can be used to set JSON data instead.
215 ``content_length`` is set automatically.
216 :param json: An object to be serialized and assigned to ``data``.
217 Defaults the content type to ``"application/json"``.
218 Serialized with the function assigned to :attr:`json_dumps`.
219 :param environ_base: an optional dict of environment defaults.
220 :param environ_overrides: an optional dict of environment overrides.
221 :param auth: An authorization object to use for the
222 ``Authorization`` header value. A ``(username, password)`` tuple
223 is a shortcut for ``Basic`` authorization.
224 :param input_stream: An IO object to pass through as the body of the
225 request, without reading, which simulates a streaming request. The
226 stream is not closed when calling :meth:`.close`, as it must remain open
227 to be read in the application.
229 .. versionchanged:: 3.2
230 Can be used as a ``with`` context manager to automatically close
232 .. versionchanged:: 3.0
233 The ``charset`` parameter was removed.
235 .. versionchanged:: 2.1
236 ``CONTENT_TYPE`` and ``CONTENT_LENGTH`` are not duplicated as
237 header keys in the environ.
239 .. versionchanged:: 2.0
240 ``REQUEST_URI`` and ``RAW_URI`` is the full raw URI including
241 the query string, not only the path.
243 .. versionchanged:: 2.0
244 The default :attr:`request_class` is ``Request`` instead of
245 ``BaseRequest``.
247 .. versionadded:: 2.0
248 Added the ``auth`` parameter.
250 .. versionadded:: 0.15
251 The ``json`` param and :meth:`json_dumps` method.
253 .. versionadded:: 0.15
254 The environ has keys ``REQUEST_URI`` and ``RAW_URI`` containing
255 the path before percent-decoding. This is not part of the WSGI
256 PEP, but many WSGI servers include it.
258 .. versionchanged:: 0.6
259 ``path`` and ``base_url`` can now be unicode strings that are
260 encoded with :func:`iri_to_uri`.
261 """
263 #: the server protocol to use. defaults to HTTP/1.1
264 server_protocol = "HTTP/1.1"
266 #: the wsgi version to use. defaults to (1, 0)
267 wsgi_version = (1, 0)
269 #: The default request class used by :meth:`get_request`.
270 request_class = Request
272 #: The serialization function used when ``json`` is passed.
273 json_dumps = staticmethod(json.dumps)
275 _args: MultiDict[str, str] = MultiDict()
276 _query_string: str | None = None
277 _form: MultiDict[str, str] = MultiDict()
278 _files: FileMultiDict = FileMultiDict()
279 _input_stream: t.IO[bytes] | None = None
281 def __init__(
282 self,
283 path: str = "/",
284 base_url: str | None = None,
285 query_string: t.Mapping[str, str] | str | None = None,
286 method: str = "GET",
287 input_stream: t.IO[bytes] | None = None,
288 content_type: str | None = None,
289 content_length: int | None = None,
290 errors_stream: t.IO[str] | None = None,
291 multithread: bool = False,
292 multiprocess: bool = False,
293 run_once: bool = False,
294 headers: Headers | t.Iterable[tuple[str, str]] | None = None,
295 data: t.IO[bytes] | str | bytes | t.Mapping[str, t.Any] | None = None,
296 environ_base: t.Mapping[str, t.Any] | None = None,
297 environ_overrides: t.Mapping[str, t.Any] | None = None,
298 mimetype: str | None = None,
299 json: t.Mapping[str, t.Any] | None = None,
300 auth: Authorization | tuple[str, str] | None = None,
301 ) -> None:
302 if query_string is not None and "?" in path:
303 raise ValueError("Query string is defined in the path and as an argument")
304 request_uri = urlsplit(path)
305 if query_string is None and "?" in path:
306 query_string = request_uri.query
308 self.path = iri_to_uri(request_uri.path)
309 self.request_uri = path
310 if base_url is not None:
311 base_url = iri_to_uri(base_url)
312 self.base_url = base_url
313 if isinstance(query_string, str):
314 self.query_string = query_string
315 else:
316 if query_string is None:
317 query_string = MultiDict()
318 elif not isinstance(query_string, MultiDict):
319 query_string = MultiDict(query_string)
320 self.args = query_string
321 self.method = method
322 if headers is None:
323 headers = Headers()
324 elif not isinstance(headers, Headers):
325 headers = Headers(headers)
326 self.headers = headers
327 if content_type is not None:
328 self.content_type = content_type
329 if errors_stream is None:
330 errors_stream = sys.stderr
331 self.errors_stream = errors_stream
332 self.multithread = multithread
333 self.multiprocess = multiprocess
334 self.run_once = run_once
335 self.environ_base = environ_base
336 self.environ_overrides = environ_overrides
337 self.input_stream = input_stream
338 self.content_length = content_length
340 if auth is not None:
341 if isinstance(auth, tuple):
342 auth = Authorization(
343 "basic", {"username": auth[0], "password": auth[1]}
344 )
346 self.headers.set("Authorization", auth.to_header())
348 if json is not None:
349 if data is not None:
350 raise TypeError("can't provide both json and data")
352 data = self.json_dumps(json)
354 if self.content_type is None:
355 self.content_type = "application/json"
357 if data:
358 if input_stream is not None:
359 raise TypeError("can't provide input stream and data")
360 if hasattr(data, "read"):
361 data = data.read()
362 if isinstance(data, str):
363 data = data.encode()
364 if isinstance(data, bytes):
365 self.input_stream = BytesIO(data)
366 if self.content_length is None:
367 self.content_length = len(data)
368 else:
369 for key, value in _iter_data(data):
370 if isinstance(value, (tuple, dict)) or hasattr(value, "read"):
371 self._add_file_from_data(key, value)
372 else:
373 self.form.setlistdefault(key).append(value)
375 if mimetype is not None:
376 self.mimetype = mimetype
378 @classmethod
379 def from_environ(cls, environ: WSGIEnvironment, **kwargs: t.Any) -> te.Self:
380 """Turn an environ dict back into a builder. Any extra kwargs
381 override the args extracted from the environ.
383 .. versionchanged:: 2.0
384 Path and query values are passed through the WSGI decoding
385 dance to avoid double encoding.
387 .. versionadded:: 0.15
388 """
389 headers = Headers(EnvironHeaders(environ))
390 out = {
391 "path": _wsgi_decoding_dance(environ["PATH_INFO"]),
392 "base_url": cls._make_base_url(
393 environ["wsgi.url_scheme"],
394 headers.pop("Host"),
395 _wsgi_decoding_dance(environ["SCRIPT_NAME"]),
396 ),
397 "query_string": _wsgi_decoding_dance(environ["QUERY_STRING"]),
398 "method": environ["REQUEST_METHOD"],
399 "input_stream": environ["wsgi.input"],
400 "content_type": headers.pop("Content-Type", None),
401 "content_length": headers.pop("Content-Length", None),
402 "errors_stream": environ["wsgi.errors"],
403 "multithread": environ["wsgi.multithread"],
404 "multiprocess": environ["wsgi.multiprocess"],
405 "run_once": environ["wsgi.run_once"],
406 "headers": headers,
407 }
408 out.update(kwargs)
409 return cls(**out)
411 def _add_file_from_data(
412 self,
413 key: str,
414 value: (t.IO[bytes] | tuple[t.IO[bytes], str] | tuple[t.IO[bytes], str, str]),
415 ) -> None:
416 """Called in the EnvironBuilder to add files from the data dict."""
417 if isinstance(value, tuple):
418 self.files.add_file(key, *value)
419 else:
420 self.files.add_file(key, value)
422 @staticmethod
423 def _make_base_url(scheme: str, host: str, script_root: str) -> str:
424 return urlunsplit((scheme, host, script_root, "", "")).rstrip("/") + "/"
426 @property
427 def base_url(self) -> str:
428 """The base URL is used to extract the URL scheme, host name,
429 port, and root path.
430 """
431 return self._make_base_url(self.url_scheme, self.host, self.script_root)
433 @base_url.setter
434 def base_url(self, value: str | None) -> None:
435 if value is None:
436 scheme = "http"
437 netloc = "localhost"
438 script_root = ""
439 else:
440 scheme, netloc, script_root, qs, anchor = urlsplit(value)
441 if qs or anchor:
442 raise ValueError("base url must not contain a query string or fragment")
443 self.script_root = script_root.rstrip("/")
444 self.host = netloc
445 self.url_scheme = scheme
447 @property
448 def content_type(self) -> str | None:
449 """The content type for the request. Reflected from and to
450 the :attr:`headers`. Do not set if you set :attr:`files` or
451 :attr:`form` for auto detection.
452 """
453 ct = self.headers.get("Content-Type")
454 if ct is None and not self._input_stream:
455 if self._files:
456 return "multipart/form-data"
457 if self._form:
458 return "application/x-www-form-urlencoded"
459 return None
460 return ct
462 @content_type.setter
463 def content_type(self, value: str | None) -> None:
464 if value is None:
465 self.headers.pop("Content-Type", None)
466 else:
467 self.headers["Content-Type"] = value
469 @property
470 def mimetype(self) -> str | None:
471 """The mimetype (content type without charset etc.)
473 .. versionadded:: 0.14
474 """
475 ct = self.content_type
476 return ct.split(";")[0].strip() if ct else None
478 @mimetype.setter
479 def mimetype(self, value: str) -> None:
480 self.content_type = get_content_type(value, "utf-8")
482 @property
483 def mimetype_params(self) -> t.Mapping[str, str]:
484 """The mimetype parameters as dict. For example if the
485 content type is ``text/html; charset=utf-8`` the params would be
486 ``{'charset': 'utf-8'}``.
488 .. versionadded:: 0.14
489 """
491 def on_update(d: CallbackDict[str, str]) -> None:
492 self.headers["Content-Type"] = dump_options_header(self.mimetype, d)
494 d = parse_options_header(self.headers.get("Content-Type", ""))[1]
495 return CallbackDict(d, on_update)
497 @property
498 def content_length(self) -> int | None:
499 """The content length as integer. Reflected from and to the
500 :attr:`headers`. Do not set if you set :attr:`files` or
501 :attr:`form` for auto detection.
502 """
503 return self.headers.get("Content-Length", type=int)
505 @content_length.setter
506 def content_length(self, value: int | None) -> None:
507 if value is None:
508 self.headers.pop("Content-Length", None)
509 else:
510 self.headers["Content-Length"] = str(value)
512 @property
513 def form(self) -> MultiDict[str, str]:
514 """Form data text values. File values are stored in :attr:`files`.
516 If any values are set and no files are set, the request body will be
517 encoded and the content type set to ``application/x-www-form-urlencoded``.
519 Set when the constructor ``data`` parameter is a dict or multidict.
521 Cannot be accessed if :attr:`input_stream` is set. Setting this will
522 unset :attr:`input_stream`.
523 """
524 if self.input_stream is not None:
525 raise AttributeError("Not available when 'input_stream' is set.")
527 return self._form
529 @form.setter
530 def form(self, value: MultiDict[str, str]) -> None:
531 self._input_stream = None
532 self._form = value
534 @property
535 def files(self) -> FileMultiDict:
536 """Form data file values. Text values are stored in :attr:`form`.
538 If any values are set, the request body will be encoded and the content
539 type set to ``multipart/form-data``.
541 Set when the constructor ``data`` parameter is a dict or multidict.
543 The :meth:`.FileMultiDict.add_file` method provides a convenient way to
544 add more files without needing to construct :class:`.FileStorage`
545 objects.
547 The file streams will be read when encoding the data. All streams will
548 be closed when the builder's :meth:`.close` method is called.
550 Cannot be accessed if :attr:`input_stream` is set. Setting this will
551 unset :attr:`input_stream`.
553 Setting this will _not_ close files in the previous dict, call
554 :meth:`.FileMultiDict.close` first if that's needed.
555 """
556 if self.input_stream is not None:
557 raise AttributeError("Not available when 'input_stream' is set.")
559 return self._files
561 @files.setter
562 def files(self, value: FileMultiDict) -> None:
563 self._input_stream = None
564 self._files = value
566 @property
567 def input_stream(self) -> t.IO[bytes] | None:
568 """A binary IO object to pass through as the body of the request,
569 without reading, which simulates a streaming request.
571 If this is set, :attr:`form` and :attr:`files` cannot be accessed.
572 If those are set, this will be ``None``. Setting this will close any
573 files and clear those.
575 The stream is not closed when calling the builder's :meth:`close`
576 method, as it must remain open to be read in the application.
578 .. versionchanged:: 3.2
579 Any values in :attr:`files` are closed first when setting this.
580 """
581 return self._input_stream
583 @input_stream.setter
584 def input_stream(self, value: t.IO[bytes] | None) -> None:
585 self._form.clear()
586 self._files.clear()
587 self._input_stream = value
589 @property
590 def query_string(self) -> str:
591 """The URL query string.
593 If this is set to a string, :attr:`args` cannot be accessed. If
594 :attr:`args` is set, this will be the encoded value of that. If neither
595 is set, this is the empty string.
596 """
597 if self._query_string is None:
598 if self._args is not None:
599 return _urlencode(self._args)
601 return ""
603 return self._query_string
605 @query_string.setter
606 def query_string(self, value: str | None) -> None:
607 self._args.clear()
608 self._query_string = value
610 @property
611 def args(self) -> MultiDict[str, str]:
612 """The URL query string as a :class:`MultiDict`.
614 Set when the constructor ``query_string`` parameter is a dict or
615 multidict.
617 Setting this will unset :attr:`query_string`.
618 """
619 if self._query_string is not None:
620 raise AttributeError("Not available when 'query_string' is set.")
622 return self._args
624 @args.setter
625 def args(self, value: MultiDict[str, str]) -> None:
626 self._query_string = None
627 self._args = value
629 @property
630 def server_name(self) -> str:
631 """The server name (read-only, use :attr:`host` to set)"""
632 return self.host.split(":", 1)[0]
634 @property
635 def server_port(self) -> int:
636 """The server port as integer (read-only, use :attr:`host` to set)"""
637 pieces = self.host.split(":", 1)
639 if len(pieces) == 2:
640 try:
641 return int(pieces[1])
642 except ValueError:
643 pass
645 if self.url_scheme == "https":
646 return 443
647 return 80
649 def __del__(self) -> None:
650 self.close()
652 def __enter__(self) -> te.Self:
653 return self
655 def __exit__(
656 self,
657 exc_type: type[BaseException] | None,
658 exc_val: BaseException | None,
659 exc_tb: TracebackType | None,
660 ) -> None:
661 self.close()
663 def close(self) -> None:
664 """Close all open files in :attr:`files`. :attr:`input_stream` is not
665 closed, as it is assumed to be managed externally.
666 """
667 self._files.close()
669 def get_environ(self) -> WSGIEnvironment:
670 """Return the built environ.
672 .. versionchanged:: 0.15
673 The content type and length headers are set based on
674 input stream detection. Previously this only set the WSGI
675 keys.
676 """
677 input_stream = self.input_stream
678 content_length = self.content_length
680 mimetype = self.mimetype
681 content_type = self.content_type
683 if input_stream is not None:
684 start_pos = input_stream.tell()
685 input_stream.seek(0, 2)
686 end_pos = input_stream.tell()
687 input_stream.seek(start_pos)
688 content_length = end_pos - start_pos
689 elif mimetype == "multipart/form-data":
690 input_stream, content_length, boundary = stream_encode_multipart(
691 CombinedMultiDict([self.form, self.files])
692 )
693 content_type = f'{mimetype}; boundary="{boundary}"'
694 elif mimetype == "application/x-www-form-urlencoded":
695 form_encoded = _urlencode(self.form).encode("ascii")
696 content_length = len(form_encoded)
697 input_stream = BytesIO(form_encoded)
698 else:
699 input_stream = BytesIO()
701 result: WSGIEnvironment = {}
702 if self.environ_base:
703 result.update(self.environ_base)
705 def _path_encode(x: str) -> str:
706 return _wsgi_encoding_dance(unquote(x))
708 raw_uri = _wsgi_encoding_dance(self.request_uri)
709 result.update(
710 {
711 "REQUEST_METHOD": self.method,
712 "SCRIPT_NAME": _path_encode(self.script_root),
713 "PATH_INFO": _path_encode(self.path),
714 "QUERY_STRING": _wsgi_encoding_dance(self.query_string),
715 # Non-standard, added by mod_wsgi, uWSGI
716 "REQUEST_URI": raw_uri,
717 # Non-standard, added by gunicorn
718 "RAW_URI": raw_uri,
719 "SERVER_NAME": self.server_name,
720 "SERVER_PORT": str(self.server_port),
721 "HTTP_HOST": self.host,
722 "SERVER_PROTOCOL": self.server_protocol,
723 "wsgi.version": self.wsgi_version,
724 "wsgi.url_scheme": self.url_scheme,
725 "wsgi.input": input_stream,
726 "wsgi.errors": self.errors_stream,
727 "wsgi.multithread": self.multithread,
728 "wsgi.multiprocess": self.multiprocess,
729 "wsgi.run_once": self.run_once,
730 }
731 )
733 headers = self.headers.copy()
734 # Don't send these as headers, they're part of the environ.
735 headers.remove("Content-Type")
736 headers.remove("Content-Length")
738 if content_type is not None:
739 result["CONTENT_TYPE"] = content_type
741 if content_length is not None:
742 result["CONTENT_LENGTH"] = str(content_length)
744 combined_headers = defaultdict(list)
746 for key, value in headers.to_wsgi_list():
747 combined_headers[f"HTTP_{key.upper().replace('-', '_')}"].append(value)
749 for key, values in combined_headers.items():
750 result[key] = ", ".join(values)
752 if self.environ_overrides:
753 result.update(self.environ_overrides)
755 return result
757 def get_request(self, cls: type[Request] | None = None) -> Request:
758 """Returns a request with the data. If the request class is not
759 specified :attr:`request_class` is used.
761 :param cls: The request wrapper to use.
762 """
763 if cls is None:
764 cls = self.request_class
766 return cls(self.get_environ())
769class ClientRedirectError(Exception):
770 """If a redirect loop is detected when using follow_redirects=True with
771 the :cls:`Client`, then this exception is raised.
772 """
775class Client:
776 """Simulate sending requests to a WSGI application without running a WSGI or HTTP
777 server.
779 :param application: The WSGI application to make requests to.
780 :param response_wrapper: A :class:`.Response` class to wrap response data with.
781 Defaults to :class:`.TestResponse`. If it's not a subclass of ``TestResponse``,
782 one will be created.
783 :param use_cookies: Persist cookies from ``Set-Cookie`` response headers to the
784 ``Cookie`` header in subsequent requests. Domain and path matching is supported,
785 but other cookie parameters are ignored.
786 :param allow_subdomain_redirects: Allow requests to follow redirects to subdomains.
787 Enable this if the application handles subdomains and redirects between them.
789 .. versionchanged:: 2.3
790 Simplify cookie implementation, support domain and path matching.
792 .. versionchanged:: 2.1
793 All data is available as properties on the returned response object. The
794 response cannot be returned as a tuple.
796 .. versionchanged:: 2.0
797 ``response_wrapper`` is always a subclass of :class:``TestResponse``.
799 .. versionchanged:: 0.5
800 Added the ``use_cookies`` parameter.
801 """
803 def __init__(
804 self,
805 application: WSGIApplication,
806 response_wrapper: type[Response] | None = None,
807 use_cookies: bool = True,
808 allow_subdomain_redirects: bool = False,
809 ) -> None:
810 self.application = application
812 if response_wrapper in {None, Response}:
813 response_wrapper = TestResponse
814 elif response_wrapper is not None and not issubclass(
815 response_wrapper, TestResponse
816 ):
817 response_wrapper = type(
818 "WrapperTestResponse",
819 (TestResponse, response_wrapper),
820 {},
821 )
823 self.response_wrapper = t.cast(type["TestResponse"], response_wrapper)
825 if use_cookies:
826 self._cookies: dict[tuple[str, str, str], Cookie] | None = {}
827 else:
828 self._cookies = None
830 self.allow_subdomain_redirects = allow_subdomain_redirects
832 def get_cookie(
833 self, key: str, domain: str = "localhost", path: str = "/"
834 ) -> Cookie | None:
835 """Return a :class:`.Cookie` if it exists. Cookies are uniquely identified by
836 ``(domain, path, key)``.
838 :param key: The decoded form of the key for the cookie.
839 :param domain: The domain the cookie was set for.
840 :param path: The path the cookie was set for.
842 .. versionadded:: 2.3
843 """
844 if self._cookies is None:
845 raise TypeError(
846 "Cookies are disabled. Create a client with 'use_cookies=True'."
847 )
849 return self._cookies.get((domain, path, key))
851 def set_cookie(
852 self,
853 key: str,
854 value: str = "",
855 *,
856 domain: str = "localhost",
857 origin_only: bool = True,
858 path: str = "/",
859 **kwargs: t.Any,
860 ) -> None:
861 """Set a cookie to be sent in subsequent requests.
863 This is a convenience to skip making a test request to a route that would set
864 the cookie. To test the cookie, make a test request to a route that uses the
865 cookie value.
867 The client uses ``domain``, ``origin_only``, and ``path`` to determine which
868 cookies to send with a request. It does not use other cookie parameters that
869 browsers use, since they're not applicable in tests.
871 :param key: The key part of the cookie.
872 :param value: The value part of the cookie.
873 :param domain: Send this cookie with requests that match this domain. If
874 ``origin_only`` is true, it must be an exact match, otherwise it may be a
875 suffix match.
876 :param origin_only: Whether the domain must be an exact match to the request.
877 :param path: Send this cookie with requests that match this path either exactly
878 or as a prefix.
879 :param kwargs: Passed to :func:`.dump_cookie`.
881 .. versionchanged:: 3.0
882 The parameter ``server_name`` is removed. The first parameter is
883 ``key``. Use the ``domain`` and ``origin_only`` parameters instead.
885 .. versionchanged:: 2.3
886 The ``origin_only`` parameter was added.
888 .. versionchanged:: 2.3
889 The ``domain`` parameter defaults to ``localhost``.
890 """
891 if self._cookies is None:
892 raise TypeError(
893 "Cookies are disabled. Create a client with 'use_cookies=True'."
894 )
896 cookie = Cookie._from_response_header(
897 domain, "/", dump_cookie(key, value, domain=domain, path=path, **kwargs)
898 )
899 cookie.origin_only = origin_only
901 if cookie._should_delete:
902 self._cookies.pop(cookie._storage_key, None)
903 else:
904 self._cookies[cookie._storage_key] = cookie
906 def delete_cookie(
907 self,
908 key: str,
909 *,
910 domain: str = "localhost",
911 path: str = "/",
912 ) -> None:
913 """Delete a cookie if it exists. Cookies are uniquely identified by
914 ``(domain, path, key)``.
916 :param key: The decoded form of the key for the cookie.
917 :param domain: The domain the cookie was set for.
918 :param path: The path the cookie was set for.
920 .. versionchanged:: 3.0
921 The ``server_name`` parameter is removed. The first parameter is
922 ``key``. Use the ``domain`` parameter instead.
924 .. versionchanged:: 3.0
925 The ``secure``, ``httponly`` and ``samesite`` parameters are removed.
927 .. versionchanged:: 2.3
928 The ``domain`` parameter defaults to ``localhost``.
929 """
930 if self._cookies is None:
931 raise TypeError(
932 "Cookies are disabled. Create a client with 'use_cookies=True'."
933 )
935 self._cookies.pop((domain, path, key), None)
937 def _add_cookies_to_wsgi(self, environ: WSGIEnvironment) -> None:
938 """If cookies are enabled, set the ``Cookie`` header in the environ to the
939 cookies that are applicable to the request host and path.
941 :meta private:
943 .. versionadded:: 2.3
944 """
945 if self._cookies is None:
946 return
948 url = urlsplit(get_current_url(environ))
949 server_name = url.hostname or "localhost"
950 value = "; ".join(
951 c._to_request_header()
952 for c in self._cookies.values()
953 if c._matches_request(server_name, url.path)
954 )
956 if value:
957 environ["HTTP_COOKIE"] = value
958 else:
959 environ.pop("HTTP_COOKIE", None)
961 def _update_cookies_from_response(
962 self, server_name: str, path: str, headers: list[str]
963 ) -> None:
964 """If cookies are enabled, update the stored cookies from any ``Set-Cookie``
965 headers in the response.
967 :meta private:
969 .. versionadded:: 2.3
970 """
971 if self._cookies is None:
972 return
974 for header in headers:
975 cookie = Cookie._from_response_header(server_name, path, header)
977 if cookie._should_delete:
978 self._cookies.pop(cookie._storage_key, None)
979 else:
980 self._cookies[cookie._storage_key] = cookie
982 def run_wsgi_app(
983 self, environ: WSGIEnvironment, buffered: bool = False
984 ) -> tuple[t.Iterable[bytes], str, Headers]:
985 """Runs the wrapped WSGI app with the given environment.
987 :meta private:
988 """
989 self._add_cookies_to_wsgi(environ)
990 rv = run_wsgi_app(self.application, environ, buffered=buffered)
991 url = urlsplit(get_current_url(environ))
992 self._update_cookies_from_response(
993 url.hostname or "localhost", url.path, rv[2].getlist("Set-Cookie")
994 )
995 return rv
997 def resolve_redirect(
998 self, response: TestResponse, buffered: bool = False
999 ) -> TestResponse:
1000 """Perform a new request to the location given by the redirect
1001 response to the previous request.
1003 :meta private:
1004 """
1005 scheme, netloc, path, qs, anchor = urlsplit(response.location)
1006 builder = EnvironBuilder.from_environ(
1007 response.request.environ, path=path, query_string=qs
1008 )
1010 to_name_parts = netloc.split(":", 1)[0].split(".")
1011 from_name_parts = builder.server_name.split(".")
1013 if to_name_parts != [""]:
1014 # The new location has a host, use it for the base URL.
1015 builder.url_scheme = scheme
1016 builder.host = netloc
1017 else:
1018 # A local redirect with autocorrect_location_header=False
1019 # doesn't have a host, so use the request's host.
1020 to_name_parts = from_name_parts
1022 # Explain why a redirect to a different server name won't be followed.
1023 if to_name_parts != from_name_parts:
1024 if to_name_parts[-len(from_name_parts) :] == from_name_parts:
1025 if not self.allow_subdomain_redirects:
1026 raise RuntimeError("Following subdomain redirects is not enabled.")
1027 else:
1028 raise RuntimeError("Following external redirects is not supported.")
1030 path_parts = path.split("/")
1031 root_parts = builder.script_root.split("/")
1033 if path_parts[: len(root_parts)] == root_parts:
1034 # Strip the script root from the path.
1035 builder.path = path[len(builder.script_root) :]
1036 else:
1037 # The new location is not under the script root, so use the
1038 # whole path and clear the previous root.
1039 builder.path = path
1040 builder.script_root = ""
1042 # Certain statuses switch to GET in some cases
1043 # https://fetch.spec.whatwg.org/#http-redirect-fetch
1044 if (response.status_code in {301, 302} and builder.method == "POST") or (
1045 response.status_code == 303 and builder.method not in {"GET", "HEAD"}
1046 ):
1047 builder.method = "GET"
1049 if builder.input_stream is not None:
1050 builder.input_stream.close()
1052 builder.input_stream = None # also closes and clears form and files
1053 builder.content_type = None
1054 builder.content_length = None
1055 builder.headers.pop("Content-Encoding", None)
1056 builder.headers.pop("Content-Language", None)
1057 builder.headers.pop("Content-Location", None)
1058 builder.headers.pop("Transfer-Encoding", None)
1060 return self.open(builder, buffered=buffered)
1062 def open(
1063 self,
1064 *args: t.Any,
1065 buffered: bool = False,
1066 follow_redirects: bool = False,
1067 **kwargs: t.Any,
1068 ) -> TestResponse:
1069 """Generate an environ dict from the given arguments, make a
1070 request to the application using it, and return the response.
1072 :param args: Passed to :class:`EnvironBuilder` to create the
1073 environ for the request. If a single arg is passed, it can
1074 be an existing :class:`EnvironBuilder` or an environ dict.
1075 :param buffered: Convert the iterator returned by the app into
1076 a list. If the iterator has a ``close()`` method, it is
1077 called automatically.
1078 :param follow_redirects: Make additional requests to follow HTTP
1079 redirects until a non-redirect status is returned.
1080 :attr:`TestResponse.history` lists the intermediate
1081 responses.
1083 .. versionchanged:: 2.1
1084 Removed the ``as_tuple`` parameter.
1086 .. versionchanged:: 2.0
1087 The request input stream is closed when calling
1088 ``response.close()``. Input streams for redirects are
1089 automatically closed.
1091 .. versionchanged:: 0.5
1092 If a dict is provided as file in the dict for the ``data``
1093 parameter the content type has to be called ``content_type``
1094 instead of ``mimetype``. This change was made for
1095 consistency with :class:`werkzeug.FileWrapper`.
1097 .. versionchanged:: 0.5
1098 Added the ``follow_redirects`` parameter.
1099 """
1100 request: Request | None = None
1102 if not kwargs and len(args) == 1:
1103 arg = args[0]
1105 if isinstance(arg, EnvironBuilder):
1106 request = arg.get_request()
1107 elif isinstance(arg, dict):
1108 with EnvironBuilder.from_environ(arg) as builder:
1109 request = builder.get_request()
1110 elif isinstance(arg, Request):
1111 request = arg
1113 if request is None:
1114 with EnvironBuilder(*args, **kwargs) as builder:
1115 request = builder.get_request()
1117 response_parts = self.run_wsgi_app(request.environ, buffered=buffered)
1118 response = self.response_wrapper(*response_parts, request=request)
1120 redirects = set()
1121 history: list[TestResponse] = []
1123 if not follow_redirects:
1124 return response
1126 while response.status_code in {301, 302, 303, 307, 308}:
1127 # Exhaust intermediate response bodies to ensure middleware
1128 # that returns an iterator runs any cleanup code.
1129 if not buffered:
1130 response.make_sequence()
1131 response.close()
1133 new_redirect_entry = (response.location, response.status_code)
1135 if new_redirect_entry in redirects:
1136 raise ClientRedirectError(
1137 f"Loop detected: A {response.status_code} redirect"
1138 f" to {response.location} was already made."
1139 )
1141 redirects.add(new_redirect_entry)
1142 response.history = tuple(history)
1143 history.append(response)
1144 response = self.resolve_redirect(response, buffered=buffered)
1145 else:
1146 # This is the final request after redirects.
1147 response.history = tuple(history)
1148 # Close the input stream when closing the response, in case
1149 # the input is an open temporary file.
1150 response.call_on_close(request.input_stream.close)
1151 return response
1153 def get(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1154 """Call :meth:`open` with ``method`` set to ``GET``."""
1155 kw["method"] = "GET"
1156 return self.open(*args, **kw)
1158 def post(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1159 """Call :meth:`open` with ``method`` set to ``POST``."""
1160 kw["method"] = "POST"
1161 return self.open(*args, **kw)
1163 def put(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1164 """Call :meth:`open` with ``method`` set to ``PUT``."""
1165 kw["method"] = "PUT"
1166 return self.open(*args, **kw)
1168 def delete(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1169 """Call :meth:`open` with ``method`` set to ``DELETE``."""
1170 kw["method"] = "DELETE"
1171 return self.open(*args, **kw)
1173 def patch(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1174 """Call :meth:`open` with ``method`` set to ``PATCH``."""
1175 kw["method"] = "PATCH"
1176 return self.open(*args, **kw)
1178 def options(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1179 """Call :meth:`open` with ``method`` set to ``OPTIONS``."""
1180 kw["method"] = "OPTIONS"
1181 return self.open(*args, **kw)
1183 def head(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1184 """Call :meth:`open` with ``method`` set to ``HEAD``."""
1185 kw["method"] = "HEAD"
1186 return self.open(*args, **kw)
1188 def trace(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1189 """Call :meth:`open` with ``method`` set to ``TRACE``."""
1190 kw["method"] = "TRACE"
1191 return self.open(*args, **kw)
1193 def __repr__(self) -> str:
1194 return f"<{type(self).__name__} {self.application!r}>"
1197def create_environ(*args: t.Any, **kwargs: t.Any) -> WSGIEnvironment:
1198 """Create a new WSGI environ dict based on the values passed. The first
1199 parameter should be the path of the request which defaults to '/'. The
1200 second one can either be an absolute path (in that case the host is
1201 localhost:80) or a full path to the request with scheme, netloc port and
1202 the path to the script.
1204 This accepts the same arguments as the :class:`EnvironBuilder`
1205 constructor.
1207 .. versionchanged:: 0.5
1208 This function is now a thin wrapper over :class:`EnvironBuilder` which
1209 was added in 0.5. The `headers`, `environ_base`, `environ_overrides`
1210 and `charset` parameters were added.
1211 """
1212 with EnvironBuilder(*args, **kwargs) as builder:
1213 return builder.get_environ()
1216def run_wsgi_app(
1217 app: WSGIApplication, environ: WSGIEnvironment, buffered: bool = False
1218) -> tuple[t.Iterable[bytes], str, Headers]:
1219 """Return a tuple in the form (app_iter, status, headers) of the
1220 application output. This works best if you pass it an application that
1221 returns an iterator all the time.
1223 Sometimes applications may use the `write()` callable returned
1224 by the `start_response` function. This tries to resolve such edge
1225 cases automatically. But if you don't get the expected output you
1226 should set `buffered` to `True` which enforces buffering.
1228 If passed an invalid WSGI application the behavior of this function is
1229 undefined. Never pass non-conforming WSGI applications to this function.
1231 :param app: the application to execute.
1232 :param buffered: set to `True` to enforce buffering.
1233 :return: tuple in the form ``(app_iter, status, headers)``
1234 """
1235 # Copy environ to ensure any mutations by the app (ProxyFix, for
1236 # example) don't affect subsequent requests (such as redirects).
1237 environ = _get_environ(environ).copy()
1238 status: str
1239 response: tuple[str, list[tuple[str, str]]] | None = None
1240 buffer: list[bytes] = []
1242 def start_response(status, headers, exc_info=None): # type: ignore
1243 nonlocal response
1245 if exc_info:
1246 try:
1247 raise exc_info[1].with_traceback(exc_info[2])
1248 finally:
1249 exc_info = None
1251 response = (status, headers)
1252 return buffer.append
1254 app_rv = app(environ, start_response)
1255 close_func = getattr(app_rv, "close", None)
1256 app_iter: t.Iterable[bytes] = iter(app_rv)
1258 # when buffering we emit the close call early and convert the
1259 # application iterator into a regular list
1260 if buffered:
1261 try:
1262 app_iter = list(app_iter)
1263 finally:
1264 if close_func is not None:
1265 close_func()
1267 # otherwise we iterate the application iter until we have a response, chain
1268 # the already received data with the already collected data and wrap it in
1269 # a new `ClosingIterator` if we need to restore a `close` callable from the
1270 # original return value.
1271 else:
1272 for item in app_iter:
1273 buffer.append(item)
1275 if response is not None:
1276 break
1278 if buffer:
1279 app_iter = chain(buffer, app_iter)
1281 if close_func is not None and app_iter is not app_rv:
1282 app_iter = ClosingIterator(app_iter, close_func)
1284 status, headers = response # type: ignore
1285 return app_iter, status, Headers(headers)
1288class TestResponse(Response):
1289 """:class:`~werkzeug.wrappers.Response` subclass that provides extra
1290 information about requests made with the test :class:`Client`.
1292 Test client requests will always return an instance of this class.
1293 If a custom response class is passed to the client, it is
1294 subclassed along with this to support test information.
1296 If the test request included large files, or if the application is
1297 serving a file, call :meth:`close` to close any open files and
1298 prevent Python showing a ``ResourceWarning``.
1300 .. versionchanged:: 2.2
1301 Set the ``default_mimetype`` to None to prevent a mimetype being
1302 assumed if missing.
1304 .. versionchanged:: 2.1
1305 Response instances cannot be treated as tuples.
1307 .. versionadded:: 2.0
1308 Test client methods always return instances of this class.
1309 """
1311 default_mimetype = None
1312 # Don't assume a mimetype, instead use whatever the response provides
1314 request: Request
1315 """A request object with the environ used to make the request that
1316 resulted in this response.
1317 """
1319 history: tuple[TestResponse, ...]
1320 """A list of intermediate responses. Populated when the test request
1321 is made with ``follow_redirects`` enabled.
1322 """
1324 # Tell Pytest to ignore this, it's not a test class.
1325 __test__ = False
1327 def __init__(
1328 self,
1329 response: t.Iterable[bytes],
1330 status: str,
1331 headers: Headers,
1332 request: Request,
1333 history: tuple[TestResponse] = (), # type: ignore
1334 **kwargs: t.Any,
1335 ) -> None:
1336 super().__init__(response, status, headers, **kwargs)
1337 self.request = request
1338 self.history = history
1339 self._compat_tuple = response, status, headers
1341 @cached_property
1342 def text(self) -> str:
1343 """The response data as text. A shortcut for
1344 ``response.get_data(as_text=True)``.
1346 .. versionadded:: 2.1
1347 """
1348 return self.get_data(as_text=True)
1351@dataclasses.dataclass
1352class Cookie:
1353 """A cookie key, value, and parameters.
1355 The class itself is not a public API. Its attributes are documented for inspection
1356 with :meth:`.Client.get_cookie` only.
1358 .. versionadded:: 2.3
1359 """
1361 key: str
1362 """The cookie key, encoded as a client would see it."""
1364 value: str
1365 """The cookie key, encoded as a client would see it."""
1367 decoded_key: str
1368 """The cookie key, decoded as the application would set and see it."""
1370 decoded_value: str
1371 """The cookie value, decoded as the application would set and see it."""
1373 expires: datetime | None
1374 """The time at which the cookie is no longer valid."""
1376 max_age: int | None
1377 """The number of seconds from when the cookie was set at which it is
1378 no longer valid.
1379 """
1381 domain: str
1382 """The domain that the cookie was set for, or the request domain if not set."""
1384 origin_only: bool
1385 """Whether the cookie will be sent for exact domain matches only. This is ``True``
1386 if the ``Domain`` parameter was not present.
1387 """
1389 path: str
1390 """The path that the cookie was set for."""
1392 secure: bool | None
1393 """The ``Secure`` parameter."""
1395 http_only: bool | None
1396 """The ``HttpOnly`` parameter."""
1398 same_site: str | None
1399 """The ``SameSite`` parameter."""
1401 def _matches_request(self, server_name: str, path: str) -> bool:
1402 return (
1403 server_name == self.domain
1404 or (
1405 not self.origin_only
1406 and server_name.endswith(self.domain)
1407 and server_name[: -len(self.domain)].endswith(".")
1408 )
1409 ) and (
1410 path == self.path
1411 or (
1412 path.startswith(self.path)
1413 and path[len(self.path) - self.path.endswith("/") :].startswith("/")
1414 )
1415 )
1417 def _to_request_header(self) -> str:
1418 return f"{self.key}={self.value}"
1420 @classmethod
1421 def _from_response_header(cls, server_name: str, path: str, header: str) -> te.Self:
1422 header, _, parameters_str = header.partition(";")
1423 key, _, value = header.partition("=")
1424 decoded_key, decoded_value = next(parse_cookie(header).items()) # type: ignore[call-overload]
1425 params = {}
1427 for item in parameters_str.split(";"):
1428 k, sep, v = item.partition("=")
1429 params[k.strip().lower()] = v.strip() if sep else None
1431 return cls(
1432 key=key.strip(),
1433 value=value.strip(),
1434 decoded_key=decoded_key,
1435 decoded_value=decoded_value,
1436 expires=parse_date(params.get("expires")),
1437 max_age=int(params["max-age"] or 0) if "max-age" in params else None,
1438 domain=params.get("domain") or server_name,
1439 origin_only="domain" not in params,
1440 path=params.get("path") or path.rpartition("/")[0] or "/",
1441 secure="secure" in params,
1442 http_only="httponly" in params,
1443 same_site=params.get("samesite"),
1444 )
1446 @property
1447 def _storage_key(self) -> tuple[str, str, str]:
1448 return self.domain, self.path, self.decoded_key
1450 @property
1451 def _should_delete(self) -> bool:
1452 return self.max_age == 0 or (
1453 self.expires is not None and self.expires.timestamp() == 0
1454 )