Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/test.py: 27%
572 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1import mimetypes
2import sys
3import typing as t
4from collections import defaultdict
5from datetime import datetime
6from datetime import timedelta
7from http.cookiejar import CookieJar
8from io import BytesIO
9from itertools import chain
10from random import random
11from tempfile import TemporaryFile
12from time import time
13from urllib.request import Request as _UrllibRequest
15from ._internal import _get_environ
16from ._internal import _make_encode_wrapper
17from ._internal import _wsgi_decoding_dance
18from ._internal import _wsgi_encoding_dance
19from .datastructures import Authorization
20from .datastructures import CallbackDict
21from .datastructures import CombinedMultiDict
22from .datastructures import EnvironHeaders
23from .datastructures import FileMultiDict
24from .datastructures import Headers
25from .datastructures import MultiDict
26from .http import dump_cookie
27from .http import dump_options_header
28from .http import parse_options_header
29from .sansio.multipart import Data
30from .sansio.multipart import Epilogue
31from .sansio.multipart import Field
32from .sansio.multipart import File
33from .sansio.multipart import MultipartEncoder
34from .sansio.multipart import Preamble
35from .urls import iri_to_uri
36from .urls import url_encode
37from .urls import url_fix
38from .urls import url_parse
39from .urls import url_unparse
40from .urls import url_unquote
41from .utils import cached_property
42from .utils import get_content_type
43from .wrappers.request import Request
44from .wrappers.response import Response
45from .wsgi import ClosingIterator
46from .wsgi import get_current_url
48if t.TYPE_CHECKING:
49 from _typeshed.wsgi import WSGIApplication
50 from _typeshed.wsgi import WSGIEnvironment
53def stream_encode_multipart(
54 data: t.Mapping[str, t.Any],
55 use_tempfile: bool = True,
56 threshold: int = 1024 * 500,
57 boundary: t.Optional[str] = None,
58 charset: str = "utf-8",
59) -> t.Tuple[t.IO[bytes], int, str]:
60 """Encode a dict of values (either strings or file descriptors or
61 :class:`FileStorage` objects.) into a multipart encoded string stored
62 in a file descriptor.
63 """
64 if boundary is None:
65 boundary = f"---------------WerkzeugFormPart_{time()}{random()}"
67 stream: t.IO[bytes] = BytesIO()
68 total_length = 0
69 on_disk = False
70 write_binary: t.Callable[[bytes], int]
72 if use_tempfile:
74 def write_binary(s: bytes) -> int:
75 nonlocal stream, total_length, on_disk
77 if on_disk:
78 return stream.write(s)
79 else:
80 length = len(s)
82 if length + total_length <= threshold:
83 stream.write(s)
84 else:
85 new_stream = t.cast(t.IO[bytes], TemporaryFile("wb+"))
86 new_stream.write(stream.getvalue()) # type: ignore
87 new_stream.write(s)
88 stream = new_stream
89 on_disk = True
91 total_length += length
92 return length
94 else:
95 write_binary = stream.write
97 encoder = MultipartEncoder(boundary.encode())
98 write_binary(encoder.send_event(Preamble(data=b"")))
99 for key, value in _iter_data(data):
100 reader = getattr(value, "read", None)
101 if reader is not None:
102 filename = getattr(value, "filename", getattr(value, "name", None))
103 content_type = getattr(value, "content_type", None)
104 if content_type is None:
105 content_type = (
106 filename
107 and mimetypes.guess_type(filename)[0]
108 or "application/octet-stream"
109 )
110 headers = Headers([("Content-Type", content_type)])
111 if filename is None:
112 write_binary(encoder.send_event(Field(name=key, headers=headers)))
113 else:
114 write_binary(
115 encoder.send_event(
116 File(name=key, filename=filename, headers=headers)
117 )
118 )
119 while True:
120 chunk = reader(16384)
122 if not chunk:
123 break
125 write_binary(encoder.send_event(Data(data=chunk, more_data=True)))
126 else:
127 if not isinstance(value, str):
128 value = str(value)
129 write_binary(encoder.send_event(Field(name=key, headers=Headers())))
130 write_binary(
131 encoder.send_event(Data(data=value.encode(charset), more_data=False))
132 )
134 write_binary(encoder.send_event(Epilogue(data=b"")))
136 length = stream.tell()
137 stream.seek(0)
138 return stream, length, boundary
141def encode_multipart(
142 values: t.Mapping[str, t.Any],
143 boundary: t.Optional[str] = None,
144 charset: str = "utf-8",
145) -> t.Tuple[str, bytes]:
146 """Like `stream_encode_multipart` but returns a tuple in the form
147 (``boundary``, ``data``) where data is bytes.
148 """
149 stream, length, boundary = stream_encode_multipart(
150 values, use_tempfile=False, boundary=boundary, charset=charset
151 )
152 return boundary, stream.read()
155class _TestCookieHeaders:
156 """A headers adapter for cookielib"""
158 def __init__(self, headers: t.Union[Headers, t.List[t.Tuple[str, str]]]) -> None:
159 self.headers = headers
161 def getheaders(self, name: str) -> t.Iterable[str]:
162 headers = []
163 name = name.lower()
164 for k, v in self.headers:
165 if k.lower() == name:
166 headers.append(v)
167 return headers
169 def get_all(
170 self, name: str, default: t.Optional[t.Iterable[str]] = None
171 ) -> t.Iterable[str]:
172 headers = self.getheaders(name)
174 if not headers:
175 return default # type: ignore
177 return headers
180class _TestCookieResponse:
181 """Something that looks like a httplib.HTTPResponse, but is actually just an
182 adapter for our test responses to make them available for cookielib.
183 """
185 def __init__(self, headers: t.Union[Headers, t.List[t.Tuple[str, str]]]) -> None:
186 self.headers = _TestCookieHeaders(headers)
188 def info(self) -> _TestCookieHeaders:
189 return self.headers
192class _TestCookieJar(CookieJar):
193 """A cookielib.CookieJar modified to inject and read cookie headers from
194 and to wsgi environments, and wsgi application responses.
195 """
197 def inject_wsgi(self, environ: "WSGIEnvironment") -> None:
198 """Inject the cookies as client headers into the server's wsgi
199 environment.
200 """
201 cvals = [f"{c.name}={c.value}" for c in self]
203 if cvals:
204 environ["HTTP_COOKIE"] = "; ".join(cvals)
205 else:
206 environ.pop("HTTP_COOKIE", None)
208 def extract_wsgi(
209 self,
210 environ: "WSGIEnvironment",
211 headers: t.Union[Headers, t.List[t.Tuple[str, str]]],
212 ) -> None:
213 """Extract the server's set-cookie headers as cookies into the
214 cookie jar.
215 """
216 self.extract_cookies(
217 _TestCookieResponse(headers), # type: ignore
218 _UrllibRequest(get_current_url(environ)),
219 )
222def _iter_data(data: t.Mapping[str, t.Any]) -> t.Iterator[t.Tuple[str, t.Any]]:
223 """Iterate over a mapping that might have a list of values, yielding
224 all key, value pairs. Almost like iter_multi_items but only allows
225 lists, not tuples, of values so tuples can be used for files.
226 """
227 if isinstance(data, MultiDict):
228 yield from data.items(multi=True)
229 else:
230 for key, value in data.items():
231 if isinstance(value, list):
232 for v in value:
233 yield key, v
234 else:
235 yield key, value
238_TAnyMultiDict = t.TypeVar("_TAnyMultiDict", bound=MultiDict)
241class EnvironBuilder:
242 """This class can be used to conveniently create a WSGI environment
243 for testing purposes. It can be used to quickly create WSGI environments
244 or request objects from arbitrary data.
246 The signature of this class is also used in some other places as of
247 Werkzeug 0.5 (:func:`create_environ`, :meth:`Response.from_values`,
248 :meth:`Client.open`). Because of this most of the functionality is
249 available through the constructor alone.
251 Files and regular form data can be manipulated independently of each
252 other with the :attr:`form` and :attr:`files` attributes, but are
253 passed with the same argument to the constructor: `data`.
255 `data` can be any of these values:
257 - a `str` or `bytes` object: The object is converted into an
258 :attr:`input_stream`, the :attr:`content_length` is set and you have to
259 provide a :attr:`content_type`.
260 - a `dict` or :class:`MultiDict`: The keys have to be strings. The values
261 have to be either any of the following objects, or a list of any of the
262 following objects:
264 - a :class:`file`-like object: These are converted into
265 :class:`FileStorage` objects automatically.
266 - a `tuple`: The :meth:`~FileMultiDict.add_file` method is called
267 with the key and the unpacked `tuple` items as positional
268 arguments.
269 - a `str`: The string is set as form data for the associated key.
270 - a file-like object: The object content is loaded in memory and then
271 handled like a regular `str` or a `bytes`.
273 :param path: the path of the request. In the WSGI environment this will
274 end up as `PATH_INFO`. If the `query_string` is not defined
275 and there is a question mark in the `path` everything after
276 it is used as query string.
277 :param base_url: the base URL is a URL that is used to extract the WSGI
278 URL scheme, host (server name + server port) and the
279 script root (`SCRIPT_NAME`).
280 :param query_string: an optional string or dict with URL parameters.
281 :param method: the HTTP method to use, defaults to `GET`.
282 :param input_stream: an optional input stream. Do not specify this and
283 `data`. As soon as an input stream is set you can't
284 modify :attr:`args` and :attr:`files` unless you
285 set the :attr:`input_stream` to `None` again.
286 :param content_type: The content type for the request. As of 0.5 you
287 don't have to provide this when specifying files
288 and form data via `data`.
289 :param content_length: The content length for the request. You don't
290 have to specify this when providing data via
291 `data`.
292 :param errors_stream: an optional error stream that is used for
293 `wsgi.errors`. Defaults to :data:`stderr`.
294 :param multithread: controls `wsgi.multithread`. Defaults to `False`.
295 :param multiprocess: controls `wsgi.multiprocess`. Defaults to `False`.
296 :param run_once: controls `wsgi.run_once`. Defaults to `False`.
297 :param headers: an optional list or :class:`Headers` object of headers.
298 :param data: a string or dict of form data or a file-object.
299 See explanation above.
300 :param json: An object to be serialized and assigned to ``data``.
301 Defaults the content type to ``"application/json"``.
302 Serialized with the function assigned to :attr:`json_dumps`.
303 :param environ_base: an optional dict of environment defaults.
304 :param environ_overrides: an optional dict of environment overrides.
305 :param charset: the charset used to encode string data.
306 :param auth: An authorization object to use for the
307 ``Authorization`` header value. A ``(username, password)`` tuple
308 is a shortcut for ``Basic`` authorization.
310 .. versionchanged:: 2.1
311 ``CONTENT_TYPE`` and ``CONTENT_LENGTH`` are not duplicated as
312 header keys in the environ.
314 .. versionchanged:: 2.0
315 ``REQUEST_URI`` and ``RAW_URI`` is the full raw URI including
316 the query string, not only the path.
318 .. versionchanged:: 2.0
319 The default :attr:`request_class` is ``Request`` instead of
320 ``BaseRequest``.
322 .. versionadded:: 2.0
323 Added the ``auth`` parameter.
325 .. versionadded:: 0.15
326 The ``json`` param and :meth:`json_dumps` method.
328 .. versionadded:: 0.15
329 The environ has keys ``REQUEST_URI`` and ``RAW_URI`` containing
330 the path before percent-decoding. This is not part of the WSGI
331 PEP, but many WSGI servers include it.
333 .. versionchanged:: 0.6
334 ``path`` and ``base_url`` can now be unicode strings that are
335 encoded with :func:`iri_to_uri`.
336 """
338 #: the server protocol to use. defaults to HTTP/1.1
339 server_protocol = "HTTP/1.1"
341 #: the wsgi version to use. defaults to (1, 0)
342 wsgi_version = (1, 0)
344 #: The default request class used by :meth:`get_request`.
345 request_class = Request
347 import json
349 #: The serialization function used when ``json`` is passed.
350 json_dumps = staticmethod(json.dumps)
351 del json
353 _args: t.Optional[MultiDict]
354 _query_string: t.Optional[str]
355 _input_stream: t.Optional[t.IO[bytes]]
356 _form: t.Optional[MultiDict]
357 _files: t.Optional[FileMultiDict]
359 def __init__(
360 self,
361 path: str = "/",
362 base_url: t.Optional[str] = None,
363 query_string: t.Optional[t.Union[t.Mapping[str, str], str]] = None,
364 method: str = "GET",
365 input_stream: t.Optional[t.IO[bytes]] = None,
366 content_type: t.Optional[str] = None,
367 content_length: t.Optional[int] = None,
368 errors_stream: t.Optional[t.IO[str]] = None,
369 multithread: bool = False,
370 multiprocess: bool = False,
371 run_once: bool = False,
372 headers: t.Optional[t.Union[Headers, t.Iterable[t.Tuple[str, str]]]] = None,
373 data: t.Optional[
374 t.Union[t.IO[bytes], str, bytes, t.Mapping[str, t.Any]]
375 ] = None,
376 environ_base: t.Optional[t.Mapping[str, t.Any]] = None,
377 environ_overrides: t.Optional[t.Mapping[str, t.Any]] = None,
378 charset: str = "utf-8",
379 mimetype: t.Optional[str] = None,
380 json: t.Optional[t.Mapping[str, t.Any]] = None,
381 auth: t.Optional[t.Union[Authorization, t.Tuple[str, str]]] = None,
382 ) -> None:
383 path_s = _make_encode_wrapper(path)
384 if query_string is not None and path_s("?") in path:
385 raise ValueError("Query string is defined in the path and as an argument")
386 request_uri = url_parse(path)
387 if query_string is None and path_s("?") in path:
388 query_string = request_uri.query
389 self.charset = charset
390 self.path = iri_to_uri(request_uri.path)
391 self.request_uri = path
392 if base_url is not None:
393 base_url = url_fix(iri_to_uri(base_url, charset), charset)
394 self.base_url = base_url # type: ignore
395 if isinstance(query_string, (bytes, str)):
396 self.query_string = query_string
397 else:
398 if query_string is None:
399 query_string = MultiDict()
400 elif not isinstance(query_string, MultiDict):
401 query_string = MultiDict(query_string)
402 self.args = query_string
403 self.method = method
404 if headers is None:
405 headers = Headers()
406 elif not isinstance(headers, Headers):
407 headers = Headers(headers)
408 self.headers = headers
409 if content_type is not None:
410 self.content_type = content_type
411 if errors_stream is None:
412 errors_stream = sys.stderr
413 self.errors_stream = errors_stream
414 self.multithread = multithread
415 self.multiprocess = multiprocess
416 self.run_once = run_once
417 self.environ_base = environ_base
418 self.environ_overrides = environ_overrides
419 self.input_stream = input_stream
420 self.content_length = content_length
421 self.closed = False
423 if auth is not None:
424 if isinstance(auth, tuple):
425 auth = Authorization(
426 "basic", {"username": auth[0], "password": auth[1]}
427 )
429 self.headers.set("Authorization", auth.to_header())
431 if json is not None:
432 if data is not None:
433 raise TypeError("can't provide both json and data")
435 data = self.json_dumps(json)
437 if self.content_type is None:
438 self.content_type = "application/json"
440 if data:
441 if input_stream is not None:
442 raise TypeError("can't provide input stream and data")
443 if hasattr(data, "read"):
444 data = data.read() # type: ignore
445 if isinstance(data, str):
446 data = data.encode(self.charset)
447 if isinstance(data, bytes):
448 self.input_stream = BytesIO(data)
449 if self.content_length is None:
450 self.content_length = len(data)
451 else:
452 for key, value in _iter_data(data): # type: ignore
453 if isinstance(value, (tuple, dict)) or hasattr(value, "read"):
454 self._add_file_from_data(key, value)
455 else:
456 self.form.setlistdefault(key).append(value)
458 if mimetype is not None:
459 self.mimetype = mimetype
461 @classmethod
462 def from_environ(
463 cls, environ: "WSGIEnvironment", **kwargs: t.Any
464 ) -> "EnvironBuilder":
465 """Turn an environ dict back into a builder. Any extra kwargs
466 override the args extracted from the environ.
468 .. versionchanged:: 2.0
469 Path and query values are passed through the WSGI decoding
470 dance to avoid double encoding.
472 .. versionadded:: 0.15
473 """
474 headers = Headers(EnvironHeaders(environ))
475 out = {
476 "path": _wsgi_decoding_dance(environ["PATH_INFO"]),
477 "base_url": cls._make_base_url(
478 environ["wsgi.url_scheme"],
479 headers.pop("Host"),
480 _wsgi_decoding_dance(environ["SCRIPT_NAME"]),
481 ),
482 "query_string": _wsgi_decoding_dance(environ["QUERY_STRING"]),
483 "method": environ["REQUEST_METHOD"],
484 "input_stream": environ["wsgi.input"],
485 "content_type": headers.pop("Content-Type", None),
486 "content_length": headers.pop("Content-Length", None),
487 "errors_stream": environ["wsgi.errors"],
488 "multithread": environ["wsgi.multithread"],
489 "multiprocess": environ["wsgi.multiprocess"],
490 "run_once": environ["wsgi.run_once"],
491 "headers": headers,
492 }
493 out.update(kwargs)
494 return cls(**out)
496 def _add_file_from_data(
497 self,
498 key: str,
499 value: t.Union[
500 t.IO[bytes], t.Tuple[t.IO[bytes], str], t.Tuple[t.IO[bytes], str, str]
501 ],
502 ) -> None:
503 """Called in the EnvironBuilder to add files from the data dict."""
504 if isinstance(value, tuple):
505 self.files.add_file(key, *value)
506 else:
507 self.files.add_file(key, value)
509 @staticmethod
510 def _make_base_url(scheme: str, host: str, script_root: str) -> str:
511 return url_unparse((scheme, host, script_root, "", "")).rstrip("/") + "/"
513 @property
514 def base_url(self) -> str:
515 """The base URL is used to extract the URL scheme, host name,
516 port, and root path.
517 """
518 return self._make_base_url(self.url_scheme, self.host, self.script_root)
520 @base_url.setter
521 def base_url(self, value: t.Optional[str]) -> None:
522 if value is None:
523 scheme = "http"
524 netloc = "localhost"
525 script_root = ""
526 else:
527 scheme, netloc, script_root, qs, anchor = url_parse(value)
528 if qs or anchor:
529 raise ValueError("base url must not contain a query string or fragment")
530 self.script_root = script_root.rstrip("/")
531 self.host = netloc
532 self.url_scheme = scheme
534 @property
535 def content_type(self) -> t.Optional[str]:
536 """The content type for the request. Reflected from and to
537 the :attr:`headers`. Do not set if you set :attr:`files` or
538 :attr:`form` for auto detection.
539 """
540 ct = self.headers.get("Content-Type")
541 if ct is None and not self._input_stream:
542 if self._files:
543 return "multipart/form-data"
544 if self._form:
545 return "application/x-www-form-urlencoded"
546 return None
547 return ct
549 @content_type.setter
550 def content_type(self, value: t.Optional[str]) -> None:
551 if value is None:
552 self.headers.pop("Content-Type", None)
553 else:
554 self.headers["Content-Type"] = value
556 @property
557 def mimetype(self) -> t.Optional[str]:
558 """The mimetype (content type without charset etc.)
560 .. versionadded:: 0.14
561 """
562 ct = self.content_type
563 return ct.split(";")[0].strip() if ct else None
565 @mimetype.setter
566 def mimetype(self, value: str) -> None:
567 self.content_type = get_content_type(value, self.charset)
569 @property
570 def mimetype_params(self) -> t.Mapping[str, str]:
571 """The mimetype parameters as dict. For example if the
572 content type is ``text/html; charset=utf-8`` the params would be
573 ``{'charset': 'utf-8'}``.
575 .. versionadded:: 0.14
576 """
578 def on_update(d: CallbackDict) -> None:
579 self.headers["Content-Type"] = dump_options_header(self.mimetype, d)
581 d = parse_options_header(self.headers.get("content-type", ""))[1]
582 return CallbackDict(d, on_update)
584 @property
585 def content_length(self) -> t.Optional[int]:
586 """The content length as integer. Reflected from and to the
587 :attr:`headers`. Do not set if you set :attr:`files` or
588 :attr:`form` for auto detection.
589 """
590 return self.headers.get("Content-Length", type=int)
592 @content_length.setter
593 def content_length(self, value: t.Optional[int]) -> None:
594 if value is None:
595 self.headers.pop("Content-Length", None)
596 else:
597 self.headers["Content-Length"] = str(value)
599 def _get_form(self, name: str, storage: t.Type[_TAnyMultiDict]) -> _TAnyMultiDict:
600 """Common behavior for getting the :attr:`form` and
601 :attr:`files` properties.
603 :param name: Name of the internal cached attribute.
604 :param storage: Storage class used for the data.
605 """
606 if self.input_stream is not None:
607 raise AttributeError("an input stream is defined")
609 rv = getattr(self, name)
611 if rv is None:
612 rv = storage()
613 setattr(self, name, rv)
615 return rv # type: ignore
617 def _set_form(self, name: str, value: MultiDict) -> None:
618 """Common behavior for setting the :attr:`form` and
619 :attr:`files` properties.
621 :param name: Name of the internal cached attribute.
622 :param value: Value to assign to the attribute.
623 """
624 self._input_stream = None
625 setattr(self, name, value)
627 @property
628 def form(self) -> MultiDict:
629 """A :class:`MultiDict` of form values."""
630 return self._get_form("_form", MultiDict)
632 @form.setter
633 def form(self, value: MultiDict) -> None:
634 self._set_form("_form", value)
636 @property
637 def files(self) -> FileMultiDict:
638 """A :class:`FileMultiDict` of uploaded files. Use
639 :meth:`~FileMultiDict.add_file` to add new files.
640 """
641 return self._get_form("_files", FileMultiDict)
643 @files.setter
644 def files(self, value: FileMultiDict) -> None:
645 self._set_form("_files", value)
647 @property
648 def input_stream(self) -> t.Optional[t.IO[bytes]]:
649 """An optional input stream. This is mutually exclusive with
650 setting :attr:`form` and :attr:`files`, setting it will clear
651 those. Do not provide this if the method is not ``POST`` or
652 another method that has a body.
653 """
654 return self._input_stream
656 @input_stream.setter
657 def input_stream(self, value: t.Optional[t.IO[bytes]]) -> None:
658 self._input_stream = value
659 self._form = None
660 self._files = None
662 @property
663 def query_string(self) -> str:
664 """The query string. If you set this to a string
665 :attr:`args` will no longer be available.
666 """
667 if self._query_string is None:
668 if self._args is not None:
669 return url_encode(self._args, charset=self.charset)
670 return ""
671 return self._query_string
673 @query_string.setter
674 def query_string(self, value: t.Optional[str]) -> None:
675 self._query_string = value
676 self._args = None
678 @property
679 def args(self) -> MultiDict:
680 """The URL arguments as :class:`MultiDict`."""
681 if self._query_string is not None:
682 raise AttributeError("a query string is defined")
683 if self._args is None:
684 self._args = MultiDict()
685 return self._args
687 @args.setter
688 def args(self, value: t.Optional[MultiDict]) -> None:
689 self._query_string = None
690 self._args = value
692 @property
693 def server_name(self) -> str:
694 """The server name (read-only, use :attr:`host` to set)"""
695 return self.host.split(":", 1)[0]
697 @property
698 def server_port(self) -> int:
699 """The server port as integer (read-only, use :attr:`host` to set)"""
700 pieces = self.host.split(":", 1)
702 if len(pieces) == 2:
703 try:
704 return int(pieces[1])
705 except ValueError:
706 pass
708 if self.url_scheme == "https":
709 return 443
710 return 80
712 def __del__(self) -> None:
713 try:
714 self.close()
715 except Exception:
716 pass
718 def close(self) -> None:
719 """Closes all files. If you put real :class:`file` objects into the
720 :attr:`files` dict you can call this method to automatically close
721 them all in one go.
722 """
723 if self.closed:
724 return
725 try:
726 files = self.files.values()
727 except AttributeError:
728 files = () # type: ignore
729 for f in files:
730 try:
731 f.close()
732 except Exception:
733 pass
734 self.closed = True
736 def get_environ(self) -> "WSGIEnvironment":
737 """Return the built environ.
739 .. versionchanged:: 0.15
740 The content type and length headers are set based on
741 input stream detection. Previously this only set the WSGI
742 keys.
743 """
744 input_stream = self.input_stream
745 content_length = self.content_length
747 mimetype = self.mimetype
748 content_type = self.content_type
750 if input_stream is not None:
751 start_pos = input_stream.tell()
752 input_stream.seek(0, 2)
753 end_pos = input_stream.tell()
754 input_stream.seek(start_pos)
755 content_length = end_pos - start_pos
756 elif mimetype == "multipart/form-data":
757 input_stream, content_length, boundary = stream_encode_multipart(
758 CombinedMultiDict([self.form, self.files]), charset=self.charset
759 )
760 content_type = f'{mimetype}; boundary="{boundary}"'
761 elif mimetype == "application/x-www-form-urlencoded":
762 form_encoded = url_encode(self.form, charset=self.charset).encode("ascii")
763 content_length = len(form_encoded)
764 input_stream = BytesIO(form_encoded)
765 else:
766 input_stream = BytesIO()
768 result: "WSGIEnvironment" = {}
769 if self.environ_base:
770 result.update(self.environ_base)
772 def _path_encode(x: str) -> str:
773 return _wsgi_encoding_dance(url_unquote(x, self.charset), self.charset)
775 raw_uri = _wsgi_encoding_dance(self.request_uri, self.charset)
776 result.update(
777 {
778 "REQUEST_METHOD": self.method,
779 "SCRIPT_NAME": _path_encode(self.script_root),
780 "PATH_INFO": _path_encode(self.path),
781 "QUERY_STRING": _wsgi_encoding_dance(self.query_string, self.charset),
782 # Non-standard, added by mod_wsgi, uWSGI
783 "REQUEST_URI": raw_uri,
784 # Non-standard, added by gunicorn
785 "RAW_URI": raw_uri,
786 "SERVER_NAME": self.server_name,
787 "SERVER_PORT": str(self.server_port),
788 "HTTP_HOST": self.host,
789 "SERVER_PROTOCOL": self.server_protocol,
790 "wsgi.version": self.wsgi_version,
791 "wsgi.url_scheme": self.url_scheme,
792 "wsgi.input": input_stream,
793 "wsgi.errors": self.errors_stream,
794 "wsgi.multithread": self.multithread,
795 "wsgi.multiprocess": self.multiprocess,
796 "wsgi.run_once": self.run_once,
797 }
798 )
800 headers = self.headers.copy()
801 # Don't send these as headers, they're part of the environ.
802 headers.remove("Content-Type")
803 headers.remove("Content-Length")
805 if content_type is not None:
806 result["CONTENT_TYPE"] = content_type
808 if content_length is not None:
809 result["CONTENT_LENGTH"] = str(content_length)
811 combined_headers = defaultdict(list)
813 for key, value in headers.to_wsgi_list():
814 combined_headers[f"HTTP_{key.upper().replace('-', '_')}"].append(value)
816 for key, values in combined_headers.items():
817 result[key] = ", ".join(values)
819 if self.environ_overrides:
820 result.update(self.environ_overrides)
822 return result
824 def get_request(self, cls: t.Optional[t.Type[Request]] = None) -> Request:
825 """Returns a request with the data. If the request class is not
826 specified :attr:`request_class` is used.
828 :param cls: The request wrapper to use.
829 """
830 if cls is None:
831 cls = self.request_class
833 return cls(self.get_environ())
836class ClientRedirectError(Exception):
837 """If a redirect loop is detected when using follow_redirects=True with
838 the :cls:`Client`, then this exception is raised.
839 """
842class Client:
843 """This class allows you to send requests to a wrapped application.
845 The use_cookies parameter indicates whether cookies should be stored and
846 sent for subsequent requests. This is True by default, but passing False
847 will disable this behaviour.
849 If you want to request some subdomain of your application you may set
850 `allow_subdomain_redirects` to `True` as if not no external redirects
851 are allowed.
853 .. versionchanged:: 2.1
854 Removed deprecated behavior of treating the response as a
855 tuple. All data is available as properties on the returned
856 response object.
858 .. versionchanged:: 2.0
859 ``response_wrapper`` is always a subclass of
860 :class:``TestResponse``.
862 .. versionchanged:: 0.5
863 Added the ``use_cookies`` parameter.
864 """
866 def __init__(
867 self,
868 application: "WSGIApplication",
869 response_wrapper: t.Optional[t.Type["Response"]] = None,
870 use_cookies: bool = True,
871 allow_subdomain_redirects: bool = False,
872 ) -> None:
873 self.application = application
875 if response_wrapper in {None, Response}:
876 response_wrapper = TestResponse
877 elif not isinstance(response_wrapper, TestResponse):
878 response_wrapper = type(
879 "WrapperTestResponse",
880 (TestResponse, response_wrapper), # type: ignore
881 {},
882 )
884 self.response_wrapper = t.cast(t.Type["TestResponse"], response_wrapper)
886 if use_cookies:
887 self.cookie_jar: t.Optional[_TestCookieJar] = _TestCookieJar()
888 else:
889 self.cookie_jar = None
891 self.allow_subdomain_redirects = allow_subdomain_redirects
893 def set_cookie(
894 self,
895 server_name: str,
896 key: str,
897 value: str = "",
898 max_age: t.Optional[t.Union[timedelta, int]] = None,
899 expires: t.Optional[t.Union[str, datetime, int, float]] = None,
900 path: str = "/",
901 domain: t.Optional[str] = None,
902 secure: bool = False,
903 httponly: bool = False,
904 samesite: t.Optional[str] = None,
905 charset: str = "utf-8",
906 ) -> None:
907 """Sets a cookie in the client's cookie jar. The server name
908 is required and has to match the one that is also passed to
909 the open call.
910 """
911 assert self.cookie_jar is not None, "cookies disabled"
912 header = dump_cookie(
913 key,
914 value,
915 max_age,
916 expires,
917 path,
918 domain,
919 secure,
920 httponly,
921 charset,
922 samesite=samesite,
923 )
924 environ = create_environ(path, base_url=f"http://{server_name}")
925 headers = [("Set-Cookie", header)]
926 self.cookie_jar.extract_wsgi(environ, headers)
928 def delete_cookie(
929 self,
930 server_name: str,
931 key: str,
932 path: str = "/",
933 domain: t.Optional[str] = None,
934 secure: bool = False,
935 httponly: bool = False,
936 samesite: t.Optional[str] = None,
937 ) -> None:
938 """Deletes a cookie in the test client."""
939 self.set_cookie(
940 server_name,
941 key,
942 expires=0,
943 max_age=0,
944 path=path,
945 domain=domain,
946 secure=secure,
947 httponly=httponly,
948 samesite=samesite,
949 )
951 def run_wsgi_app(
952 self, environ: "WSGIEnvironment", buffered: bool = False
953 ) -> t.Tuple[t.Iterable[bytes], str, Headers]:
954 """Runs the wrapped WSGI app with the given environment.
956 :meta private:
957 """
958 if self.cookie_jar is not None:
959 self.cookie_jar.inject_wsgi(environ)
961 rv = run_wsgi_app(self.application, environ, buffered=buffered)
963 if self.cookie_jar is not None:
964 self.cookie_jar.extract_wsgi(environ, rv[2])
966 return rv
968 def resolve_redirect(
969 self, response: "TestResponse", buffered: bool = False
970 ) -> "TestResponse":
971 """Perform a new request to the location given by the redirect
972 response to the previous request.
974 :meta private:
975 """
976 scheme, netloc, path, qs, anchor = url_parse(response.location)
977 builder = EnvironBuilder.from_environ(
978 response.request.environ, path=path, query_string=qs
979 )
981 to_name_parts = netloc.split(":", 1)[0].split(".")
982 from_name_parts = builder.server_name.split(".")
984 if to_name_parts != [""]:
985 # The new location has a host, use it for the base URL.
986 builder.url_scheme = scheme
987 builder.host = netloc
988 else:
989 # A local redirect with autocorrect_location_header=False
990 # doesn't have a host, so use the request's host.
991 to_name_parts = from_name_parts
993 # Explain why a redirect to a different server name won't be followed.
994 if to_name_parts != from_name_parts:
995 if to_name_parts[-len(from_name_parts) :] == from_name_parts:
996 if not self.allow_subdomain_redirects:
997 raise RuntimeError("Following subdomain redirects is not enabled.")
998 else:
999 raise RuntimeError("Following external redirects is not supported.")
1001 path_parts = path.split("/")
1002 root_parts = builder.script_root.split("/")
1004 if path_parts[: len(root_parts)] == root_parts:
1005 # Strip the script root from the path.
1006 builder.path = path[len(builder.script_root) :]
1007 else:
1008 # The new location is not under the script root, so use the
1009 # whole path and clear the previous root.
1010 builder.path = path
1011 builder.script_root = ""
1013 # Only 307 and 308 preserve all of the original request.
1014 if response.status_code not in {307, 308}:
1015 # HEAD is preserved, everything else becomes GET.
1016 if builder.method != "HEAD":
1017 builder.method = "GET"
1019 # Clear the body and the headers that describe it.
1021 if builder.input_stream is not None:
1022 builder.input_stream.close()
1023 builder.input_stream = None
1025 builder.content_type = None
1026 builder.content_length = None
1027 builder.headers.pop("Transfer-Encoding", None)
1029 return self.open(builder, buffered=buffered)
1031 def open(
1032 self,
1033 *args: t.Any,
1034 buffered: bool = False,
1035 follow_redirects: bool = False,
1036 **kwargs: t.Any,
1037 ) -> "TestResponse":
1038 """Generate an environ dict from the given arguments, make a
1039 request to the application using it, and return the response.
1041 :param args: Passed to :class:`EnvironBuilder` to create the
1042 environ for the request. If a single arg is passed, it can
1043 be an existing :class:`EnvironBuilder` or an environ dict.
1044 :param buffered: Convert the iterator returned by the app into
1045 a list. If the iterator has a ``close()`` method, it is
1046 called automatically.
1047 :param follow_redirects: Make additional requests to follow HTTP
1048 redirects until a non-redirect status is returned.
1049 :attr:`TestResponse.history` lists the intermediate
1050 responses.
1052 .. versionchanged:: 2.1
1053 Removed the ``as_tuple`` parameter.
1055 .. versionchanged:: 2.0
1056 ``as_tuple`` is deprecated and will be removed in Werkzeug
1057 2.1. Use :attr:`TestResponse.request` and
1058 ``request.environ`` instead.
1060 .. versionchanged:: 2.0
1061 The request input stream is closed when calling
1062 ``response.close()``. Input streams for redirects are
1063 automatically closed.
1065 .. versionchanged:: 0.5
1066 If a dict is provided as file in the dict for the ``data``
1067 parameter the content type has to be called ``content_type``
1068 instead of ``mimetype``. This change was made for
1069 consistency with :class:`werkzeug.FileWrapper`.
1071 .. versionchanged:: 0.5
1072 Added the ``follow_redirects`` parameter.
1073 """
1074 request: t.Optional["Request"] = None
1076 if not kwargs and len(args) == 1:
1077 arg = args[0]
1079 if isinstance(arg, EnvironBuilder):
1080 request = arg.get_request()
1081 elif isinstance(arg, dict):
1082 request = EnvironBuilder.from_environ(arg).get_request()
1083 elif isinstance(arg, Request):
1084 request = arg
1086 if request is None:
1087 builder = EnvironBuilder(*args, **kwargs)
1089 try:
1090 request = builder.get_request()
1091 finally:
1092 builder.close()
1094 response = self.run_wsgi_app(request.environ, buffered=buffered)
1095 response = self.response_wrapper(*response, request=request)
1097 redirects = set()
1098 history: t.List["TestResponse"] = []
1100 if not follow_redirects:
1101 return response
1103 while response.status_code in {
1104 301,
1105 302,
1106 303,
1107 305,
1108 307,
1109 308,
1110 }:
1111 # Exhaust intermediate response bodies to ensure middleware
1112 # that returns an iterator runs any cleanup code.
1113 if not buffered:
1114 response.make_sequence()
1115 response.close()
1117 new_redirect_entry = (response.location, response.status_code)
1119 if new_redirect_entry in redirects:
1120 raise ClientRedirectError(
1121 f"Loop detected: A {response.status_code} redirect"
1122 f" to {response.location} was already made."
1123 )
1125 redirects.add(new_redirect_entry)
1126 response.history = tuple(history)
1127 history.append(response)
1128 response = self.resolve_redirect(response, buffered=buffered)
1129 else:
1130 # This is the final request after redirects.
1131 response.history = tuple(history)
1132 # Close the input stream when closing the response, in case
1133 # the input is an open temporary file.
1134 response.call_on_close(request.input_stream.close)
1135 return response
1137 def get(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1138 """Call :meth:`open` with ``method`` set to ``GET``."""
1139 kw["method"] = "GET"
1140 return self.open(*args, **kw)
1142 def post(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1143 """Call :meth:`open` with ``method`` set to ``POST``."""
1144 kw["method"] = "POST"
1145 return self.open(*args, **kw)
1147 def put(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1148 """Call :meth:`open` with ``method`` set to ``PUT``."""
1149 kw["method"] = "PUT"
1150 return self.open(*args, **kw)
1152 def delete(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1153 """Call :meth:`open` with ``method`` set to ``DELETE``."""
1154 kw["method"] = "DELETE"
1155 return self.open(*args, **kw)
1157 def patch(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1158 """Call :meth:`open` with ``method`` set to ``PATCH``."""
1159 kw["method"] = "PATCH"
1160 return self.open(*args, **kw)
1162 def options(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1163 """Call :meth:`open` with ``method`` set to ``OPTIONS``."""
1164 kw["method"] = "OPTIONS"
1165 return self.open(*args, **kw)
1167 def head(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1168 """Call :meth:`open` with ``method`` set to ``HEAD``."""
1169 kw["method"] = "HEAD"
1170 return self.open(*args, **kw)
1172 def trace(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1173 """Call :meth:`open` with ``method`` set to ``TRACE``."""
1174 kw["method"] = "TRACE"
1175 return self.open(*args, **kw)
1177 def __repr__(self) -> str:
1178 return f"<{type(self).__name__} {self.application!r}>"
1181def create_environ(*args: t.Any, **kwargs: t.Any) -> "WSGIEnvironment":
1182 """Create a new WSGI environ dict based on the values passed. The first
1183 parameter should be the path of the request which defaults to '/'. The
1184 second one can either be an absolute path (in that case the host is
1185 localhost:80) or a full path to the request with scheme, netloc port and
1186 the path to the script.
1188 This accepts the same arguments as the :class:`EnvironBuilder`
1189 constructor.
1191 .. versionchanged:: 0.5
1192 This function is now a thin wrapper over :class:`EnvironBuilder` which
1193 was added in 0.5. The `headers`, `environ_base`, `environ_overrides`
1194 and `charset` parameters were added.
1195 """
1196 builder = EnvironBuilder(*args, **kwargs)
1198 try:
1199 return builder.get_environ()
1200 finally:
1201 builder.close()
1204def run_wsgi_app(
1205 app: "WSGIApplication", environ: "WSGIEnvironment", buffered: bool = False
1206) -> t.Tuple[t.Iterable[bytes], str, Headers]:
1207 """Return a tuple in the form (app_iter, status, headers) of the
1208 application output. This works best if you pass it an application that
1209 returns an iterator all the time.
1211 Sometimes applications may use the `write()` callable returned
1212 by the `start_response` function. This tries to resolve such edge
1213 cases automatically. But if you don't get the expected output you
1214 should set `buffered` to `True` which enforces buffering.
1216 If passed an invalid WSGI application the behavior of this function is
1217 undefined. Never pass non-conforming WSGI applications to this function.
1219 :param app: the application to execute.
1220 :param buffered: set to `True` to enforce buffering.
1221 :return: tuple in the form ``(app_iter, status, headers)``
1222 """
1223 # Copy environ to ensure any mutations by the app (ProxyFix, for
1224 # example) don't affect subsequent requests (such as redirects).
1225 environ = _get_environ(environ).copy()
1226 status: str
1227 response: t.Optional[t.Tuple[str, t.List[t.Tuple[str, str]]]] = None
1228 buffer: t.List[bytes] = []
1230 def start_response(status, headers, exc_info=None): # type: ignore
1231 nonlocal response
1233 if exc_info:
1234 try:
1235 raise exc_info[1].with_traceback(exc_info[2])
1236 finally:
1237 exc_info = None
1239 response = (status, headers)
1240 return buffer.append
1242 app_rv = app(environ, start_response)
1243 close_func = getattr(app_rv, "close", None)
1244 app_iter: t.Iterable[bytes] = iter(app_rv)
1246 # when buffering we emit the close call early and convert the
1247 # application iterator into a regular list
1248 if buffered:
1249 try:
1250 app_iter = list(app_iter)
1251 finally:
1252 if close_func is not None:
1253 close_func()
1255 # otherwise we iterate the application iter until we have a response, chain
1256 # the already received data with the already collected data and wrap it in
1257 # a new `ClosingIterator` if we need to restore a `close` callable from the
1258 # original return value.
1259 else:
1260 for item in app_iter:
1261 buffer.append(item)
1263 if response is not None:
1264 break
1266 if buffer:
1267 app_iter = chain(buffer, app_iter)
1269 if close_func is not None and app_iter is not app_rv:
1270 app_iter = ClosingIterator(app_iter, close_func)
1272 status, headers = response # type: ignore
1273 return app_iter, status, Headers(headers)
1276class TestResponse(Response):
1277 """:class:`~werkzeug.wrappers.Response` subclass that provides extra
1278 information about requests made with the test :class:`Client`.
1280 Test client requests will always return an instance of this class.
1281 If a custom response class is passed to the client, it is
1282 subclassed along with this to support test information.
1284 If the test request included large files, or if the application is
1285 serving a file, call :meth:`close` to close any open files and
1286 prevent Python showing a ``ResourceWarning``.
1288 .. versionchanged:: 2.2
1289 Set the ``default_mimetype`` to None to prevent a mimetype being
1290 assumed if missing.
1292 .. versionchanged:: 2.1
1293 Removed deprecated behavior for treating the response instance
1294 as a tuple.
1296 .. versionadded:: 2.0
1297 Test client methods always return instances of this class.
1298 """
1300 default_mimetype = None
1301 # Don't assume a mimetype, instead use whatever the response provides
1303 request: Request
1304 """A request object with the environ used to make the request that
1305 resulted in this response.
1306 """
1308 history: t.Tuple["TestResponse", ...]
1309 """A list of intermediate responses. Populated when the test request
1310 is made with ``follow_redirects`` enabled.
1311 """
1313 # Tell Pytest to ignore this, it's not a test class.
1314 __test__ = False
1316 def __init__(
1317 self,
1318 response: t.Iterable[bytes],
1319 status: str,
1320 headers: Headers,
1321 request: Request,
1322 history: t.Tuple["TestResponse"] = (), # type: ignore
1323 **kwargs: t.Any,
1324 ) -> None:
1325 super().__init__(response, status, headers, **kwargs)
1326 self.request = request
1327 self.history = history
1328 self._compat_tuple = response, status, headers
1330 @cached_property
1331 def text(self) -> str:
1332 """The response data as text. A shortcut for
1333 ``response.get_data(as_text=True)``.
1335 .. versionadded:: 2.1
1336 """
1337 return self.get_data(as_text=True)