Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/test.py: 27%
573 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1import mimetypes
2import sys
3import typing as t
4from collections import defaultdict
5from datetime import datetime
6from datetime import timedelta
7from http.cookiejar import CookieJar
8from io import BytesIO
9from itertools import chain
10from random import random
11from tempfile import TemporaryFile
12from time import time
13from urllib.request import Request as _UrllibRequest
15from ._internal import _get_environ
16from ._internal import _make_encode_wrapper
17from ._internal import _wsgi_decoding_dance
18from ._internal import _wsgi_encoding_dance
19from .datastructures import Authorization
20from .datastructures import CallbackDict
21from .datastructures import CombinedMultiDict
22from .datastructures import EnvironHeaders
23from .datastructures import FileMultiDict
24from .datastructures import Headers
25from .datastructures import MultiDict
26from .http import dump_cookie
27from .http import dump_options_header
28from .http import parse_options_header
29from .sansio.multipart import Data
30from .sansio.multipart import Epilogue
31from .sansio.multipart import Field
32from .sansio.multipart import File
33from .sansio.multipart import MultipartEncoder
34from .sansio.multipart import Preamble
35from .urls import iri_to_uri
36from .urls import url_encode
37from .urls import url_fix
38from .urls import url_parse
39from .urls import url_unparse
40from .urls import url_unquote
41from .utils import cached_property
42from .utils import get_content_type
43from .wrappers.request import Request
44from .wrappers.response import Response
45from .wsgi import ClosingIterator
46from .wsgi import get_current_url
48if t.TYPE_CHECKING:
49 from _typeshed.wsgi import WSGIApplication
50 from _typeshed.wsgi import WSGIEnvironment
53def stream_encode_multipart(
54 data: t.Mapping[str, t.Any],
55 use_tempfile: bool = True,
56 threshold: int = 1024 * 500,
57 boundary: t.Optional[str] = None,
58 charset: str = "utf-8",
59) -> t.Tuple[t.IO[bytes], int, str]:
60 """Encode a dict of values (either strings or file descriptors or
61 :class:`FileStorage` objects.) into a multipart encoded string stored
62 in a file descriptor.
63 """
64 if boundary is None:
65 boundary = f"---------------WerkzeugFormPart_{time()}{random()}"
67 stream: t.IO[bytes] = BytesIO()
68 total_length = 0
69 on_disk = False
70 write_binary: t.Callable[[bytes], int]
72 if use_tempfile:
74 def write_binary(s: bytes) -> int:
75 nonlocal stream, total_length, on_disk
77 if on_disk:
78 return stream.write(s)
79 else:
80 length = len(s)
82 if length + total_length <= threshold:
83 stream.write(s)
84 else:
85 new_stream = t.cast(t.IO[bytes], TemporaryFile("wb+"))
86 new_stream.write(stream.getvalue()) # type: ignore
87 new_stream.write(s)
88 stream = new_stream
89 on_disk = True
91 total_length += length
92 return length
94 else:
95 write_binary = stream.write
97 encoder = MultipartEncoder(boundary.encode())
98 write_binary(encoder.send_event(Preamble(data=b"")))
99 for key, value in _iter_data(data):
100 reader = getattr(value, "read", None)
101 if reader is not None:
102 filename = getattr(value, "filename", getattr(value, "name", None))
103 content_type = getattr(value, "content_type", None)
104 if content_type is None:
105 content_type = (
106 filename
107 and mimetypes.guess_type(filename)[0]
108 or "application/octet-stream"
109 )
110 headers = value.headers
111 headers.update([("Content-Type", content_type)])
112 if filename is None:
113 write_binary(encoder.send_event(Field(name=key, headers=headers)))
114 else:
115 write_binary(
116 encoder.send_event(
117 File(name=key, filename=filename, headers=headers)
118 )
119 )
120 while True:
121 chunk = reader(16384)
123 if not chunk:
124 break
126 write_binary(encoder.send_event(Data(data=chunk, more_data=True)))
127 else:
128 if not isinstance(value, str):
129 value = str(value)
130 write_binary(encoder.send_event(Field(name=key, headers=Headers())))
131 write_binary(
132 encoder.send_event(Data(data=value.encode(charset), more_data=False))
133 )
135 write_binary(encoder.send_event(Epilogue(data=b"")))
137 length = stream.tell()
138 stream.seek(0)
139 return stream, length, boundary
142def encode_multipart(
143 values: t.Mapping[str, t.Any],
144 boundary: t.Optional[str] = None,
145 charset: str = "utf-8",
146) -> t.Tuple[str, bytes]:
147 """Like `stream_encode_multipart` but returns a tuple in the form
148 (``boundary``, ``data``) where data is bytes.
149 """
150 stream, length, boundary = stream_encode_multipart(
151 values, use_tempfile=False, boundary=boundary, charset=charset
152 )
153 return boundary, stream.read()
156class _TestCookieHeaders:
157 """A headers adapter for cookielib"""
159 def __init__(self, headers: t.Union[Headers, t.List[t.Tuple[str, str]]]) -> None:
160 self.headers = headers
162 def getheaders(self, name: str) -> t.Iterable[str]:
163 headers = []
164 name = name.lower()
165 for k, v in self.headers:
166 if k.lower() == name:
167 headers.append(v)
168 return headers
170 def get_all(
171 self, name: str, default: t.Optional[t.Iterable[str]] = None
172 ) -> t.Iterable[str]:
173 headers = self.getheaders(name)
175 if not headers:
176 return default # type: ignore
178 return headers
181class _TestCookieResponse:
182 """Something that looks like a httplib.HTTPResponse, but is actually just an
183 adapter for our test responses to make them available for cookielib.
184 """
186 def __init__(self, headers: t.Union[Headers, t.List[t.Tuple[str, str]]]) -> None:
187 self.headers = _TestCookieHeaders(headers)
189 def info(self) -> _TestCookieHeaders:
190 return self.headers
193class _TestCookieJar(CookieJar):
194 """A cookielib.CookieJar modified to inject and read cookie headers from
195 and to wsgi environments, and wsgi application responses.
196 """
198 def inject_wsgi(self, environ: "WSGIEnvironment") -> None:
199 """Inject the cookies as client headers into the server's wsgi
200 environment.
201 """
202 cvals = [f"{c.name}={c.value}" for c in self]
204 if cvals:
205 environ["HTTP_COOKIE"] = "; ".join(cvals)
206 else:
207 environ.pop("HTTP_COOKIE", None)
209 def extract_wsgi(
210 self,
211 environ: "WSGIEnvironment",
212 headers: t.Union[Headers, t.List[t.Tuple[str, str]]],
213 ) -> None:
214 """Extract the server's set-cookie headers as cookies into the
215 cookie jar.
216 """
217 self.extract_cookies(
218 _TestCookieResponse(headers), # type: ignore
219 _UrllibRequest(get_current_url(environ)),
220 )
223def _iter_data(data: t.Mapping[str, t.Any]) -> t.Iterator[t.Tuple[str, t.Any]]:
224 """Iterate over a mapping that might have a list of values, yielding
225 all key, value pairs. Almost like iter_multi_items but only allows
226 lists, not tuples, of values so tuples can be used for files.
227 """
228 if isinstance(data, MultiDict):
229 yield from data.items(multi=True)
230 else:
231 for key, value in data.items():
232 if isinstance(value, list):
233 for v in value:
234 yield key, v
235 else:
236 yield key, value
239_TAnyMultiDict = t.TypeVar("_TAnyMultiDict", bound=MultiDict)
242class EnvironBuilder:
243 """This class can be used to conveniently create a WSGI environment
244 for testing purposes. It can be used to quickly create WSGI environments
245 or request objects from arbitrary data.
247 The signature of this class is also used in some other places as of
248 Werkzeug 0.5 (:func:`create_environ`, :meth:`Response.from_values`,
249 :meth:`Client.open`). Because of this most of the functionality is
250 available through the constructor alone.
252 Files and regular form data can be manipulated independently of each
253 other with the :attr:`form` and :attr:`files` attributes, but are
254 passed with the same argument to the constructor: `data`.
256 `data` can be any of these values:
258 - a `str` or `bytes` object: The object is converted into an
259 :attr:`input_stream`, the :attr:`content_length` is set and you have to
260 provide a :attr:`content_type`.
261 - a `dict` or :class:`MultiDict`: The keys have to be strings. The values
262 have to be either any of the following objects, or a list of any of the
263 following objects:
265 - a :class:`file`-like object: These are converted into
266 :class:`FileStorage` objects automatically.
267 - a `tuple`: The :meth:`~FileMultiDict.add_file` method is called
268 with the key and the unpacked `tuple` items as positional
269 arguments.
270 - a `str`: The string is set as form data for the associated key.
271 - a file-like object: The object content is loaded in memory and then
272 handled like a regular `str` or a `bytes`.
274 :param path: the path of the request. In the WSGI environment this will
275 end up as `PATH_INFO`. If the `query_string` is not defined
276 and there is a question mark in the `path` everything after
277 it is used as query string.
278 :param base_url: the base URL is a URL that is used to extract the WSGI
279 URL scheme, host (server name + server port) and the
280 script root (`SCRIPT_NAME`).
281 :param query_string: an optional string or dict with URL parameters.
282 :param method: the HTTP method to use, defaults to `GET`.
283 :param input_stream: an optional input stream. Do not specify this and
284 `data`. As soon as an input stream is set you can't
285 modify :attr:`args` and :attr:`files` unless you
286 set the :attr:`input_stream` to `None` again.
287 :param content_type: The content type for the request. As of 0.5 you
288 don't have to provide this when specifying files
289 and form data via `data`.
290 :param content_length: The content length for the request. You don't
291 have to specify this when providing data via
292 `data`.
293 :param errors_stream: an optional error stream that is used for
294 `wsgi.errors`. Defaults to :data:`stderr`.
295 :param multithread: controls `wsgi.multithread`. Defaults to `False`.
296 :param multiprocess: controls `wsgi.multiprocess`. Defaults to `False`.
297 :param run_once: controls `wsgi.run_once`. Defaults to `False`.
298 :param headers: an optional list or :class:`Headers` object of headers.
299 :param data: a string or dict of form data or a file-object.
300 See explanation above.
301 :param json: An object to be serialized and assigned to ``data``.
302 Defaults the content type to ``"application/json"``.
303 Serialized with the function assigned to :attr:`json_dumps`.
304 :param environ_base: an optional dict of environment defaults.
305 :param environ_overrides: an optional dict of environment overrides.
306 :param charset: the charset used to encode string data.
307 :param auth: An authorization object to use for the
308 ``Authorization`` header value. A ``(username, password)`` tuple
309 is a shortcut for ``Basic`` authorization.
311 .. versionchanged:: 2.1
312 ``CONTENT_TYPE`` and ``CONTENT_LENGTH`` are not duplicated as
313 header keys in the environ.
315 .. versionchanged:: 2.0
316 ``REQUEST_URI`` and ``RAW_URI`` is the full raw URI including
317 the query string, not only the path.
319 .. versionchanged:: 2.0
320 The default :attr:`request_class` is ``Request`` instead of
321 ``BaseRequest``.
323 .. versionadded:: 2.0
324 Added the ``auth`` parameter.
326 .. versionadded:: 0.15
327 The ``json`` param and :meth:`json_dumps` method.
329 .. versionadded:: 0.15
330 The environ has keys ``REQUEST_URI`` and ``RAW_URI`` containing
331 the path before percent-decoding. This is not part of the WSGI
332 PEP, but many WSGI servers include it.
334 .. versionchanged:: 0.6
335 ``path`` and ``base_url`` can now be unicode strings that are
336 encoded with :func:`iri_to_uri`.
337 """
339 #: the server protocol to use. defaults to HTTP/1.1
340 server_protocol = "HTTP/1.1"
342 #: the wsgi version to use. defaults to (1, 0)
343 wsgi_version = (1, 0)
345 #: The default request class used by :meth:`get_request`.
346 request_class = Request
348 import json
350 #: The serialization function used when ``json`` is passed.
351 json_dumps = staticmethod(json.dumps)
352 del json
354 _args: t.Optional[MultiDict]
355 _query_string: t.Optional[str]
356 _input_stream: t.Optional[t.IO[bytes]]
357 _form: t.Optional[MultiDict]
358 _files: t.Optional[FileMultiDict]
360 def __init__(
361 self,
362 path: str = "/",
363 base_url: t.Optional[str] = None,
364 query_string: t.Optional[t.Union[t.Mapping[str, str], str]] = None,
365 method: str = "GET",
366 input_stream: t.Optional[t.IO[bytes]] = None,
367 content_type: t.Optional[str] = None,
368 content_length: t.Optional[int] = None,
369 errors_stream: t.Optional[t.IO[str]] = None,
370 multithread: bool = False,
371 multiprocess: bool = False,
372 run_once: bool = False,
373 headers: t.Optional[t.Union[Headers, t.Iterable[t.Tuple[str, str]]]] = None,
374 data: t.Optional[
375 t.Union[t.IO[bytes], str, bytes, t.Mapping[str, t.Any]]
376 ] = None,
377 environ_base: t.Optional[t.Mapping[str, t.Any]] = None,
378 environ_overrides: t.Optional[t.Mapping[str, t.Any]] = None,
379 charset: str = "utf-8",
380 mimetype: t.Optional[str] = None,
381 json: t.Optional[t.Mapping[str, t.Any]] = None,
382 auth: t.Optional[t.Union[Authorization, t.Tuple[str, str]]] = None,
383 ) -> None:
384 path_s = _make_encode_wrapper(path)
385 if query_string is not None and path_s("?") in path:
386 raise ValueError("Query string is defined in the path and as an argument")
387 request_uri = url_parse(path)
388 if query_string is None and path_s("?") in path:
389 query_string = request_uri.query
390 self.charset = charset
391 self.path = iri_to_uri(request_uri.path)
392 self.request_uri = path
393 if base_url is not None:
394 base_url = url_fix(iri_to_uri(base_url, charset), charset)
395 self.base_url = base_url # type: ignore
396 if isinstance(query_string, (bytes, str)):
397 self.query_string = query_string
398 else:
399 if query_string is None:
400 query_string = MultiDict()
401 elif not isinstance(query_string, MultiDict):
402 query_string = MultiDict(query_string)
403 self.args = query_string
404 self.method = method
405 if headers is None:
406 headers = Headers()
407 elif not isinstance(headers, Headers):
408 headers = Headers(headers)
409 self.headers = headers
410 if content_type is not None:
411 self.content_type = content_type
412 if errors_stream is None:
413 errors_stream = sys.stderr
414 self.errors_stream = errors_stream
415 self.multithread = multithread
416 self.multiprocess = multiprocess
417 self.run_once = run_once
418 self.environ_base = environ_base
419 self.environ_overrides = environ_overrides
420 self.input_stream = input_stream
421 self.content_length = content_length
422 self.closed = False
424 if auth is not None:
425 if isinstance(auth, tuple):
426 auth = Authorization(
427 "basic", {"username": auth[0], "password": auth[1]}
428 )
430 self.headers.set("Authorization", auth.to_header())
432 if json is not None:
433 if data is not None:
434 raise TypeError("can't provide both json and data")
436 data = self.json_dumps(json)
438 if self.content_type is None:
439 self.content_type = "application/json"
441 if data:
442 if input_stream is not None:
443 raise TypeError("can't provide input stream and data")
444 if hasattr(data, "read"):
445 data = data.read()
446 if isinstance(data, str):
447 data = data.encode(self.charset)
448 if isinstance(data, bytes):
449 self.input_stream = BytesIO(data)
450 if self.content_length is None:
451 self.content_length = len(data)
452 else:
453 for key, value in _iter_data(data):
454 if isinstance(value, (tuple, dict)) or hasattr(value, "read"):
455 self._add_file_from_data(key, value)
456 else:
457 self.form.setlistdefault(key).append(value)
459 if mimetype is not None:
460 self.mimetype = mimetype
462 @classmethod
463 def from_environ(
464 cls, environ: "WSGIEnvironment", **kwargs: t.Any
465 ) -> "EnvironBuilder":
466 """Turn an environ dict back into a builder. Any extra kwargs
467 override the args extracted from the environ.
469 .. versionchanged:: 2.0
470 Path and query values are passed through the WSGI decoding
471 dance to avoid double encoding.
473 .. versionadded:: 0.15
474 """
475 headers = Headers(EnvironHeaders(environ))
476 out = {
477 "path": _wsgi_decoding_dance(environ["PATH_INFO"]),
478 "base_url": cls._make_base_url(
479 environ["wsgi.url_scheme"],
480 headers.pop("Host"),
481 _wsgi_decoding_dance(environ["SCRIPT_NAME"]),
482 ),
483 "query_string": _wsgi_decoding_dance(environ["QUERY_STRING"]),
484 "method": environ["REQUEST_METHOD"],
485 "input_stream": environ["wsgi.input"],
486 "content_type": headers.pop("Content-Type", None),
487 "content_length": headers.pop("Content-Length", None),
488 "errors_stream": environ["wsgi.errors"],
489 "multithread": environ["wsgi.multithread"],
490 "multiprocess": environ["wsgi.multiprocess"],
491 "run_once": environ["wsgi.run_once"],
492 "headers": headers,
493 }
494 out.update(kwargs)
495 return cls(**out)
497 def _add_file_from_data(
498 self,
499 key: str,
500 value: t.Union[
501 t.IO[bytes], t.Tuple[t.IO[bytes], str], t.Tuple[t.IO[bytes], str, str]
502 ],
503 ) -> None:
504 """Called in the EnvironBuilder to add files from the data dict."""
505 if isinstance(value, tuple):
506 self.files.add_file(key, *value)
507 else:
508 self.files.add_file(key, value)
510 @staticmethod
511 def _make_base_url(scheme: str, host: str, script_root: str) -> str:
512 return url_unparse((scheme, host, script_root, "", "")).rstrip("/") + "/"
514 @property
515 def base_url(self) -> str:
516 """The base URL is used to extract the URL scheme, host name,
517 port, and root path.
518 """
519 return self._make_base_url(self.url_scheme, self.host, self.script_root)
521 @base_url.setter
522 def base_url(self, value: t.Optional[str]) -> None:
523 if value is None:
524 scheme = "http"
525 netloc = "localhost"
526 script_root = ""
527 else:
528 scheme, netloc, script_root, qs, anchor = url_parse(value)
529 if qs or anchor:
530 raise ValueError("base url must not contain a query string or fragment")
531 self.script_root = script_root.rstrip("/")
532 self.host = netloc
533 self.url_scheme = scheme
535 @property
536 def content_type(self) -> t.Optional[str]:
537 """The content type for the request. Reflected from and to
538 the :attr:`headers`. Do not set if you set :attr:`files` or
539 :attr:`form` for auto detection.
540 """
541 ct = self.headers.get("Content-Type")
542 if ct is None and not self._input_stream:
543 if self._files:
544 return "multipart/form-data"
545 if self._form:
546 return "application/x-www-form-urlencoded"
547 return None
548 return ct
550 @content_type.setter
551 def content_type(self, value: t.Optional[str]) -> None:
552 if value is None:
553 self.headers.pop("Content-Type", None)
554 else:
555 self.headers["Content-Type"] = value
557 @property
558 def mimetype(self) -> t.Optional[str]:
559 """The mimetype (content type without charset etc.)
561 .. versionadded:: 0.14
562 """
563 ct = self.content_type
564 return ct.split(";")[0].strip() if ct else None
566 @mimetype.setter
567 def mimetype(self, value: str) -> None:
568 self.content_type = get_content_type(value, self.charset)
570 @property
571 def mimetype_params(self) -> t.Mapping[str, str]:
572 """The mimetype parameters as dict. For example if the
573 content type is ``text/html; charset=utf-8`` the params would be
574 ``{'charset': 'utf-8'}``.
576 .. versionadded:: 0.14
577 """
579 def on_update(d: CallbackDict) -> None:
580 self.headers["Content-Type"] = dump_options_header(self.mimetype, d)
582 d = parse_options_header(self.headers.get("content-type", ""))[1]
583 return CallbackDict(d, on_update)
585 @property
586 def content_length(self) -> t.Optional[int]:
587 """The content length as integer. Reflected from and to the
588 :attr:`headers`. Do not set if you set :attr:`files` or
589 :attr:`form` for auto detection.
590 """
591 return self.headers.get("Content-Length", type=int)
593 @content_length.setter
594 def content_length(self, value: t.Optional[int]) -> None:
595 if value is None:
596 self.headers.pop("Content-Length", None)
597 else:
598 self.headers["Content-Length"] = str(value)
600 def _get_form(self, name: str, storage: t.Type[_TAnyMultiDict]) -> _TAnyMultiDict:
601 """Common behavior for getting the :attr:`form` and
602 :attr:`files` properties.
604 :param name: Name of the internal cached attribute.
605 :param storage: Storage class used for the data.
606 """
607 if self.input_stream is not None:
608 raise AttributeError("an input stream is defined")
610 rv = getattr(self, name)
612 if rv is None:
613 rv = storage()
614 setattr(self, name, rv)
616 return rv # type: ignore
618 def _set_form(self, name: str, value: MultiDict) -> None:
619 """Common behavior for setting the :attr:`form` and
620 :attr:`files` properties.
622 :param name: Name of the internal cached attribute.
623 :param value: Value to assign to the attribute.
624 """
625 self._input_stream = None
626 setattr(self, name, value)
628 @property
629 def form(self) -> MultiDict:
630 """A :class:`MultiDict` of form values."""
631 return self._get_form("_form", MultiDict)
633 @form.setter
634 def form(self, value: MultiDict) -> None:
635 self._set_form("_form", value)
637 @property
638 def files(self) -> FileMultiDict:
639 """A :class:`FileMultiDict` of uploaded files. Use
640 :meth:`~FileMultiDict.add_file` to add new files.
641 """
642 return self._get_form("_files", FileMultiDict)
644 @files.setter
645 def files(self, value: FileMultiDict) -> None:
646 self._set_form("_files", value)
648 @property
649 def input_stream(self) -> t.Optional[t.IO[bytes]]:
650 """An optional input stream. This is mutually exclusive with
651 setting :attr:`form` and :attr:`files`, setting it will clear
652 those. Do not provide this if the method is not ``POST`` or
653 another method that has a body.
654 """
655 return self._input_stream
657 @input_stream.setter
658 def input_stream(self, value: t.Optional[t.IO[bytes]]) -> None:
659 self._input_stream = value
660 self._form = None
661 self._files = None
663 @property
664 def query_string(self) -> str:
665 """The query string. If you set this to a string
666 :attr:`args` will no longer be available.
667 """
668 if self._query_string is None:
669 if self._args is not None:
670 return url_encode(self._args, charset=self.charset)
671 return ""
672 return self._query_string
674 @query_string.setter
675 def query_string(self, value: t.Optional[str]) -> None:
676 self._query_string = value
677 self._args = None
679 @property
680 def args(self) -> MultiDict:
681 """The URL arguments as :class:`MultiDict`."""
682 if self._query_string is not None:
683 raise AttributeError("a query string is defined")
684 if self._args is None:
685 self._args = MultiDict()
686 return self._args
688 @args.setter
689 def args(self, value: t.Optional[MultiDict]) -> None:
690 self._query_string = None
691 self._args = value
693 @property
694 def server_name(self) -> str:
695 """The server name (read-only, use :attr:`host` to set)"""
696 return self.host.split(":", 1)[0]
698 @property
699 def server_port(self) -> int:
700 """The server port as integer (read-only, use :attr:`host` to set)"""
701 pieces = self.host.split(":", 1)
703 if len(pieces) == 2:
704 try:
705 return int(pieces[1])
706 except ValueError:
707 pass
709 if self.url_scheme == "https":
710 return 443
711 return 80
713 def __del__(self) -> None:
714 try:
715 self.close()
716 except Exception:
717 pass
719 def close(self) -> None:
720 """Closes all files. If you put real :class:`file` objects into the
721 :attr:`files` dict you can call this method to automatically close
722 them all in one go.
723 """
724 if self.closed:
725 return
726 try:
727 files = self.files.values()
728 except AttributeError:
729 files = () # type: ignore
730 for f in files:
731 try:
732 f.close()
733 except Exception:
734 pass
735 self.closed = True
737 def get_environ(self) -> "WSGIEnvironment":
738 """Return the built environ.
740 .. versionchanged:: 0.15
741 The content type and length headers are set based on
742 input stream detection. Previously this only set the WSGI
743 keys.
744 """
745 input_stream = self.input_stream
746 content_length = self.content_length
748 mimetype = self.mimetype
749 content_type = self.content_type
751 if input_stream is not None:
752 start_pos = input_stream.tell()
753 input_stream.seek(0, 2)
754 end_pos = input_stream.tell()
755 input_stream.seek(start_pos)
756 content_length = end_pos - start_pos
757 elif mimetype == "multipart/form-data":
758 input_stream, content_length, boundary = stream_encode_multipart(
759 CombinedMultiDict([self.form, self.files]), charset=self.charset
760 )
761 content_type = f'{mimetype}; boundary="{boundary}"'
762 elif mimetype == "application/x-www-form-urlencoded":
763 form_encoded = url_encode(self.form, charset=self.charset).encode("ascii")
764 content_length = len(form_encoded)
765 input_stream = BytesIO(form_encoded)
766 else:
767 input_stream = BytesIO()
769 result: "WSGIEnvironment" = {}
770 if self.environ_base:
771 result.update(self.environ_base)
773 def _path_encode(x: str) -> str:
774 return _wsgi_encoding_dance(url_unquote(x, self.charset), self.charset)
776 raw_uri = _wsgi_encoding_dance(self.request_uri, self.charset)
777 result.update(
778 {
779 "REQUEST_METHOD": self.method,
780 "SCRIPT_NAME": _path_encode(self.script_root),
781 "PATH_INFO": _path_encode(self.path),
782 "QUERY_STRING": _wsgi_encoding_dance(self.query_string, self.charset),
783 # Non-standard, added by mod_wsgi, uWSGI
784 "REQUEST_URI": raw_uri,
785 # Non-standard, added by gunicorn
786 "RAW_URI": raw_uri,
787 "SERVER_NAME": self.server_name,
788 "SERVER_PORT": str(self.server_port),
789 "HTTP_HOST": self.host,
790 "SERVER_PROTOCOL": self.server_protocol,
791 "wsgi.version": self.wsgi_version,
792 "wsgi.url_scheme": self.url_scheme,
793 "wsgi.input": input_stream,
794 "wsgi.errors": self.errors_stream,
795 "wsgi.multithread": self.multithread,
796 "wsgi.multiprocess": self.multiprocess,
797 "wsgi.run_once": self.run_once,
798 }
799 )
801 headers = self.headers.copy()
802 # Don't send these as headers, they're part of the environ.
803 headers.remove("Content-Type")
804 headers.remove("Content-Length")
806 if content_type is not None:
807 result["CONTENT_TYPE"] = content_type
809 if content_length is not None:
810 result["CONTENT_LENGTH"] = str(content_length)
812 combined_headers = defaultdict(list)
814 for key, value in headers.to_wsgi_list():
815 combined_headers[f"HTTP_{key.upper().replace('-', '_')}"].append(value)
817 for key, values in combined_headers.items():
818 result[key] = ", ".join(values)
820 if self.environ_overrides:
821 result.update(self.environ_overrides)
823 return result
825 def get_request(self, cls: t.Optional[t.Type[Request]] = None) -> Request:
826 """Returns a request with the data. If the request class is not
827 specified :attr:`request_class` is used.
829 :param cls: The request wrapper to use.
830 """
831 if cls is None:
832 cls = self.request_class
834 return cls(self.get_environ())
837class ClientRedirectError(Exception):
838 """If a redirect loop is detected when using follow_redirects=True with
839 the :cls:`Client`, then this exception is raised.
840 """
843class Client:
844 """This class allows you to send requests to a wrapped application.
846 The use_cookies parameter indicates whether cookies should be stored and
847 sent for subsequent requests. This is True by default, but passing False
848 will disable this behaviour.
850 If you want to request some subdomain of your application you may set
851 `allow_subdomain_redirects` to `True` as if not no external redirects
852 are allowed.
854 .. versionchanged:: 2.1
855 Removed deprecated behavior of treating the response as a
856 tuple. All data is available as properties on the returned
857 response object.
859 .. versionchanged:: 2.0
860 ``response_wrapper`` is always a subclass of
861 :class:``TestResponse``.
863 .. versionchanged:: 0.5
864 Added the ``use_cookies`` parameter.
865 """
867 def __init__(
868 self,
869 application: "WSGIApplication",
870 response_wrapper: t.Optional[t.Type["Response"]] = None,
871 use_cookies: bool = True,
872 allow_subdomain_redirects: bool = False,
873 ) -> None:
874 self.application = application
876 if response_wrapper in {None, Response}:
877 response_wrapper = TestResponse
878 elif not isinstance(response_wrapper, TestResponse):
879 response_wrapper = type(
880 "WrapperTestResponse",
881 (TestResponse, response_wrapper), # type: ignore
882 {},
883 )
885 self.response_wrapper = t.cast(t.Type["TestResponse"], response_wrapper)
887 if use_cookies:
888 self.cookie_jar: t.Optional[_TestCookieJar] = _TestCookieJar()
889 else:
890 self.cookie_jar = None
892 self.allow_subdomain_redirects = allow_subdomain_redirects
894 def set_cookie(
895 self,
896 server_name: str,
897 key: str,
898 value: str = "",
899 max_age: t.Optional[t.Union[timedelta, int]] = None,
900 expires: t.Optional[t.Union[str, datetime, int, float]] = None,
901 path: str = "/",
902 domain: t.Optional[str] = None,
903 secure: bool = False,
904 httponly: bool = False,
905 samesite: t.Optional[str] = None,
906 charset: str = "utf-8",
907 ) -> None:
908 """Sets a cookie in the client's cookie jar. The server name
909 is required and has to match the one that is also passed to
910 the open call.
911 """
912 assert self.cookie_jar is not None, "cookies disabled"
913 header = dump_cookie(
914 key,
915 value,
916 max_age,
917 expires,
918 path,
919 domain,
920 secure,
921 httponly,
922 charset,
923 samesite=samesite,
924 )
925 environ = create_environ(path, base_url=f"http://{server_name}")
926 headers = [("Set-Cookie", header)]
927 self.cookie_jar.extract_wsgi(environ, headers)
929 def delete_cookie(
930 self,
931 server_name: str,
932 key: str,
933 path: str = "/",
934 domain: t.Optional[str] = None,
935 secure: bool = False,
936 httponly: bool = False,
937 samesite: t.Optional[str] = None,
938 ) -> None:
939 """Deletes a cookie in the test client."""
940 self.set_cookie(
941 server_name,
942 key,
943 expires=0,
944 max_age=0,
945 path=path,
946 domain=domain,
947 secure=secure,
948 httponly=httponly,
949 samesite=samesite,
950 )
952 def run_wsgi_app(
953 self, environ: "WSGIEnvironment", buffered: bool = False
954 ) -> t.Tuple[t.Iterable[bytes], str, Headers]:
955 """Runs the wrapped WSGI app with the given environment.
957 :meta private:
958 """
959 if self.cookie_jar is not None:
960 self.cookie_jar.inject_wsgi(environ)
962 rv = run_wsgi_app(self.application, environ, buffered=buffered)
964 if self.cookie_jar is not None:
965 self.cookie_jar.extract_wsgi(environ, rv[2])
967 return rv
969 def resolve_redirect(
970 self, response: "TestResponse", buffered: bool = False
971 ) -> "TestResponse":
972 """Perform a new request to the location given by the redirect
973 response to the previous request.
975 :meta private:
976 """
977 scheme, netloc, path, qs, anchor = url_parse(response.location)
978 builder = EnvironBuilder.from_environ(
979 response.request.environ, path=path, query_string=qs
980 )
982 to_name_parts = netloc.split(":", 1)[0].split(".")
983 from_name_parts = builder.server_name.split(".")
985 if to_name_parts != [""]:
986 # The new location has a host, use it for the base URL.
987 builder.url_scheme = scheme
988 builder.host = netloc
989 else:
990 # A local redirect with autocorrect_location_header=False
991 # doesn't have a host, so use the request's host.
992 to_name_parts = from_name_parts
994 # Explain why a redirect to a different server name won't be followed.
995 if to_name_parts != from_name_parts:
996 if to_name_parts[-len(from_name_parts) :] == from_name_parts:
997 if not self.allow_subdomain_redirects:
998 raise RuntimeError("Following subdomain redirects is not enabled.")
999 else:
1000 raise RuntimeError("Following external redirects is not supported.")
1002 path_parts = path.split("/")
1003 root_parts = builder.script_root.split("/")
1005 if path_parts[: len(root_parts)] == root_parts:
1006 # Strip the script root from the path.
1007 builder.path = path[len(builder.script_root) :]
1008 else:
1009 # The new location is not under the script root, so use the
1010 # whole path and clear the previous root.
1011 builder.path = path
1012 builder.script_root = ""
1014 # Only 307 and 308 preserve all of the original request.
1015 if response.status_code not in {307, 308}:
1016 # HEAD is preserved, everything else becomes GET.
1017 if builder.method != "HEAD":
1018 builder.method = "GET"
1020 # Clear the body and the headers that describe it.
1022 if builder.input_stream is not None:
1023 builder.input_stream.close()
1024 builder.input_stream = None
1026 builder.content_type = None
1027 builder.content_length = None
1028 builder.headers.pop("Transfer-Encoding", None)
1030 return self.open(builder, buffered=buffered)
1032 def open(
1033 self,
1034 *args: t.Any,
1035 buffered: bool = False,
1036 follow_redirects: bool = False,
1037 **kwargs: t.Any,
1038 ) -> "TestResponse":
1039 """Generate an environ dict from the given arguments, make a
1040 request to the application using it, and return the response.
1042 :param args: Passed to :class:`EnvironBuilder` to create the
1043 environ for the request. If a single arg is passed, it can
1044 be an existing :class:`EnvironBuilder` or an environ dict.
1045 :param buffered: Convert the iterator returned by the app into
1046 a list. If the iterator has a ``close()`` method, it is
1047 called automatically.
1048 :param follow_redirects: Make additional requests to follow HTTP
1049 redirects until a non-redirect status is returned.
1050 :attr:`TestResponse.history` lists the intermediate
1051 responses.
1053 .. versionchanged:: 2.1
1054 Removed the ``as_tuple`` parameter.
1056 .. versionchanged:: 2.0
1057 ``as_tuple`` is deprecated and will be removed in Werkzeug
1058 2.1. Use :attr:`TestResponse.request` and
1059 ``request.environ`` instead.
1061 .. versionchanged:: 2.0
1062 The request input stream is closed when calling
1063 ``response.close()``. Input streams for redirects are
1064 automatically closed.
1066 .. versionchanged:: 0.5
1067 If a dict is provided as file in the dict for the ``data``
1068 parameter the content type has to be called ``content_type``
1069 instead of ``mimetype``. This change was made for
1070 consistency with :class:`werkzeug.FileWrapper`.
1072 .. versionchanged:: 0.5
1073 Added the ``follow_redirects`` parameter.
1074 """
1075 request: t.Optional["Request"] = None
1077 if not kwargs and len(args) == 1:
1078 arg = args[0]
1080 if isinstance(arg, EnvironBuilder):
1081 request = arg.get_request()
1082 elif isinstance(arg, dict):
1083 request = EnvironBuilder.from_environ(arg).get_request()
1084 elif isinstance(arg, Request):
1085 request = arg
1087 if request is None:
1088 builder = EnvironBuilder(*args, **kwargs)
1090 try:
1091 request = builder.get_request()
1092 finally:
1093 builder.close()
1095 response = self.run_wsgi_app(request.environ, buffered=buffered)
1096 response = self.response_wrapper(*response, request=request)
1098 redirects = set()
1099 history: t.List["TestResponse"] = []
1101 if not follow_redirects:
1102 return response
1104 while response.status_code in {
1105 301,
1106 302,
1107 303,
1108 305,
1109 307,
1110 308,
1111 }:
1112 # Exhaust intermediate response bodies to ensure middleware
1113 # that returns an iterator runs any cleanup code.
1114 if not buffered:
1115 response.make_sequence()
1116 response.close()
1118 new_redirect_entry = (response.location, response.status_code)
1120 if new_redirect_entry in redirects:
1121 raise ClientRedirectError(
1122 f"Loop detected: A {response.status_code} redirect"
1123 f" to {response.location} was already made."
1124 )
1126 redirects.add(new_redirect_entry)
1127 response.history = tuple(history)
1128 history.append(response)
1129 response = self.resolve_redirect(response, buffered=buffered)
1130 else:
1131 # This is the final request after redirects.
1132 response.history = tuple(history)
1133 # Close the input stream when closing the response, in case
1134 # the input is an open temporary file.
1135 response.call_on_close(request.input_stream.close)
1136 return response
1138 def get(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1139 """Call :meth:`open` with ``method`` set to ``GET``."""
1140 kw["method"] = "GET"
1141 return self.open(*args, **kw)
1143 def post(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1144 """Call :meth:`open` with ``method`` set to ``POST``."""
1145 kw["method"] = "POST"
1146 return self.open(*args, **kw)
1148 def put(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1149 """Call :meth:`open` with ``method`` set to ``PUT``."""
1150 kw["method"] = "PUT"
1151 return self.open(*args, **kw)
1153 def delete(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1154 """Call :meth:`open` with ``method`` set to ``DELETE``."""
1155 kw["method"] = "DELETE"
1156 return self.open(*args, **kw)
1158 def patch(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1159 """Call :meth:`open` with ``method`` set to ``PATCH``."""
1160 kw["method"] = "PATCH"
1161 return self.open(*args, **kw)
1163 def options(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1164 """Call :meth:`open` with ``method`` set to ``OPTIONS``."""
1165 kw["method"] = "OPTIONS"
1166 return self.open(*args, **kw)
1168 def head(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1169 """Call :meth:`open` with ``method`` set to ``HEAD``."""
1170 kw["method"] = "HEAD"
1171 return self.open(*args, **kw)
1173 def trace(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1174 """Call :meth:`open` with ``method`` set to ``TRACE``."""
1175 kw["method"] = "TRACE"
1176 return self.open(*args, **kw)
1178 def __repr__(self) -> str:
1179 return f"<{type(self).__name__} {self.application!r}>"
1182def create_environ(*args: t.Any, **kwargs: t.Any) -> "WSGIEnvironment":
1183 """Create a new WSGI environ dict based on the values passed. The first
1184 parameter should be the path of the request which defaults to '/'. The
1185 second one can either be an absolute path (in that case the host is
1186 localhost:80) or a full path to the request with scheme, netloc port and
1187 the path to the script.
1189 This accepts the same arguments as the :class:`EnvironBuilder`
1190 constructor.
1192 .. versionchanged:: 0.5
1193 This function is now a thin wrapper over :class:`EnvironBuilder` which
1194 was added in 0.5. The `headers`, `environ_base`, `environ_overrides`
1195 and `charset` parameters were added.
1196 """
1197 builder = EnvironBuilder(*args, **kwargs)
1199 try:
1200 return builder.get_environ()
1201 finally:
1202 builder.close()
1205def run_wsgi_app(
1206 app: "WSGIApplication", environ: "WSGIEnvironment", buffered: bool = False
1207) -> t.Tuple[t.Iterable[bytes], str, Headers]:
1208 """Return a tuple in the form (app_iter, status, headers) of the
1209 application output. This works best if you pass it an application that
1210 returns an iterator all the time.
1212 Sometimes applications may use the `write()` callable returned
1213 by the `start_response` function. This tries to resolve such edge
1214 cases automatically. But if you don't get the expected output you
1215 should set `buffered` to `True` which enforces buffering.
1217 If passed an invalid WSGI application the behavior of this function is
1218 undefined. Never pass non-conforming WSGI applications to this function.
1220 :param app: the application to execute.
1221 :param buffered: set to `True` to enforce buffering.
1222 :return: tuple in the form ``(app_iter, status, headers)``
1223 """
1224 # Copy environ to ensure any mutations by the app (ProxyFix, for
1225 # example) don't affect subsequent requests (such as redirects).
1226 environ = _get_environ(environ).copy()
1227 status: str
1228 response: t.Optional[t.Tuple[str, t.List[t.Tuple[str, str]]]] = None
1229 buffer: t.List[bytes] = []
1231 def start_response(status, headers, exc_info=None): # type: ignore
1232 nonlocal response
1234 if exc_info:
1235 try:
1236 raise exc_info[1].with_traceback(exc_info[2])
1237 finally:
1238 exc_info = None
1240 response = (status, headers)
1241 return buffer.append
1243 app_rv = app(environ, start_response)
1244 close_func = getattr(app_rv, "close", None)
1245 app_iter: t.Iterable[bytes] = iter(app_rv)
1247 # when buffering we emit the close call early and convert the
1248 # application iterator into a regular list
1249 if buffered:
1250 try:
1251 app_iter = list(app_iter)
1252 finally:
1253 if close_func is not None:
1254 close_func()
1256 # otherwise we iterate the application iter until we have a response, chain
1257 # the already received data with the already collected data and wrap it in
1258 # a new `ClosingIterator` if we need to restore a `close` callable from the
1259 # original return value.
1260 else:
1261 for item in app_iter:
1262 buffer.append(item)
1264 if response is not None:
1265 break
1267 if buffer:
1268 app_iter = chain(buffer, app_iter)
1270 if close_func is not None and app_iter is not app_rv:
1271 app_iter = ClosingIterator(app_iter, close_func)
1273 status, headers = response # type: ignore
1274 return app_iter, status, Headers(headers)
1277class TestResponse(Response):
1278 """:class:`~werkzeug.wrappers.Response` subclass that provides extra
1279 information about requests made with the test :class:`Client`.
1281 Test client requests will always return an instance of this class.
1282 If a custom response class is passed to the client, it is
1283 subclassed along with this to support test information.
1285 If the test request included large files, or if the application is
1286 serving a file, call :meth:`close` to close any open files and
1287 prevent Python showing a ``ResourceWarning``.
1289 .. versionchanged:: 2.2
1290 Set the ``default_mimetype`` to None to prevent a mimetype being
1291 assumed if missing.
1293 .. versionchanged:: 2.1
1294 Removed deprecated behavior for treating the response instance
1295 as a tuple.
1297 .. versionadded:: 2.0
1298 Test client methods always return instances of this class.
1299 """
1301 default_mimetype = None
1302 # Don't assume a mimetype, instead use whatever the response provides
1304 request: Request
1305 """A request object with the environ used to make the request that
1306 resulted in this response.
1307 """
1309 history: t.Tuple["TestResponse", ...]
1310 """A list of intermediate responses. Populated when the test request
1311 is made with ``follow_redirects`` enabled.
1312 """
1314 # Tell Pytest to ignore this, it's not a test class.
1315 __test__ = False
1317 def __init__(
1318 self,
1319 response: t.Iterable[bytes],
1320 status: str,
1321 headers: Headers,
1322 request: Request,
1323 history: t.Tuple["TestResponse"] = (), # type: ignore
1324 **kwargs: t.Any,
1325 ) -> None:
1326 super().__init__(response, status, headers, **kwargs)
1327 self.request = request
1328 self.history = history
1329 self._compat_tuple = response, status, headers
1331 @cached_property
1332 def text(self) -> str:
1333 """The response data as text. A shortcut for
1334 ``response.get_data(as_text=True)``.
1336 .. versionadded:: 2.1
1337 """
1338 return self.get_data(as_text=True)