Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/test.py: 46%
568 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:03 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:03 +0000
1import mimetypes
2import sys
3import typing as t
4from collections import defaultdict
5from datetime import datetime
6from datetime import timedelta
7from http.cookiejar import CookieJar
8from io import BytesIO
9from itertools import chain
10from random import random
11from tempfile import TemporaryFile
12from time import time
13from urllib.request import Request as _UrllibRequest
15from ._internal import _get_environ
16from ._internal import _make_encode_wrapper
17from ._internal import _wsgi_decoding_dance
18from ._internal import _wsgi_encoding_dance
19from .datastructures import Authorization
20from .datastructures import CallbackDict
21from .datastructures import CombinedMultiDict
22from .datastructures import EnvironHeaders
23from .datastructures import FileMultiDict
24from .datastructures import Headers
25from .datastructures import MultiDict
26from .http import dump_cookie
27from .http import dump_options_header
28from .http import parse_options_header
29from .sansio.multipart import Data
30from .sansio.multipart import Epilogue
31from .sansio.multipart import Field
32from .sansio.multipart import File
33from .sansio.multipart import MultipartEncoder
34from .sansio.multipart import Preamble
35from .urls import iri_to_uri
36from .urls import url_encode
37from .urls import url_fix
38from .urls import url_parse
39from .urls import url_unparse
40from .urls import url_unquote
41from .utils import cached_property
42from .utils import get_content_type
43from .wrappers.request import Request
44from .wrappers.response import Response
45from .wsgi import ClosingIterator
46from .wsgi import get_current_url
48if t.TYPE_CHECKING:
49 from _typeshed.wsgi import WSGIApplication
50 from _typeshed.wsgi import WSGIEnvironment
53def stream_encode_multipart(
54 data: t.Mapping[str, t.Any],
55 use_tempfile: bool = True,
56 threshold: int = 1024 * 500,
57 boundary: t.Optional[str] = None,
58 charset: str = "utf-8",
59) -> t.Tuple[t.IO[bytes], int, str]:
60 """Encode a dict of values (either strings or file descriptors or
61 :class:`FileStorage` objects.) into a multipart encoded string stored
62 in a file descriptor.
63 """
64 if boundary is None:
65 boundary = f"---------------WerkzeugFormPart_{time()}{random()}"
67 stream: t.IO[bytes] = BytesIO()
68 total_length = 0
69 on_disk = False
70 write_binary: t.Callable[[bytes], int]
72 if use_tempfile:
74 def write_binary(s: bytes) -> int:
75 nonlocal stream, total_length, on_disk
77 if on_disk:
78 return stream.write(s)
79 else:
80 length = len(s)
82 if length + total_length <= threshold:
83 stream.write(s)
84 else:
85 new_stream = t.cast(t.IO[bytes], TemporaryFile("wb+"))
86 new_stream.write(stream.getvalue()) # type: ignore
87 new_stream.write(s)
88 stream = new_stream
89 on_disk = True
91 total_length += length
92 return length
94 else:
95 write_binary = stream.write
97 encoder = MultipartEncoder(boundary.encode())
98 write_binary(encoder.send_event(Preamble(data=b"")))
99 for key, value in _iter_data(data):
100 reader = getattr(value, "read", None)
101 if reader is not None:
102 filename = getattr(value, "filename", getattr(value, "name", None))
103 content_type = getattr(value, "content_type", None)
104 if content_type is None:
105 content_type = (
106 filename
107 and mimetypes.guess_type(filename)[0]
108 or "application/octet-stream"
109 )
110 headers = Headers([("Content-Type", content_type)])
111 if filename is None:
112 write_binary(encoder.send_event(Field(name=key, headers=headers)))
113 else:
114 write_binary(
115 encoder.send_event(
116 File(name=key, filename=filename, headers=headers)
117 )
118 )
119 while True:
120 chunk = reader(16384)
122 if not chunk:
123 break
125 write_binary(encoder.send_event(Data(data=chunk, more_data=True)))
126 else:
127 if not isinstance(value, str):
128 value = str(value)
129 write_binary(encoder.send_event(Field(name=key, headers=Headers())))
130 write_binary(
131 encoder.send_event(Data(data=value.encode(charset), more_data=False))
132 )
134 write_binary(encoder.send_event(Epilogue(data=b"")))
136 length = stream.tell()
137 stream.seek(0)
138 return stream, length, boundary
141def encode_multipart(
142 values: t.Mapping[str, t.Any],
143 boundary: t.Optional[str] = None,
144 charset: str = "utf-8",
145) -> t.Tuple[str, bytes]:
146 """Like `stream_encode_multipart` but returns a tuple in the form
147 (``boundary``, ``data``) where data is bytes.
148 """
149 stream, length, boundary = stream_encode_multipart(
150 values, use_tempfile=False, boundary=boundary, charset=charset
151 )
152 return boundary, stream.read()
155class _TestCookieHeaders:
156 """A headers adapter for cookielib"""
158 def __init__(self, headers: t.Union[Headers, t.List[t.Tuple[str, str]]]) -> None:
159 self.headers = headers
161 def getheaders(self, name: str) -> t.Iterable[str]:
162 headers = []
163 name = name.lower()
164 for k, v in self.headers:
165 if k.lower() == name:
166 headers.append(v)
167 return headers
169 def get_all(
170 self, name: str, default: t.Optional[t.Iterable[str]] = None
171 ) -> t.Iterable[str]:
172 headers = self.getheaders(name)
174 if not headers:
175 return default # type: ignore
177 return headers
180class _TestCookieResponse:
181 """Something that looks like a httplib.HTTPResponse, but is actually just an
182 adapter for our test responses to make them available for cookielib.
183 """
185 def __init__(self, headers: t.Union[Headers, t.List[t.Tuple[str, str]]]) -> None:
186 self.headers = _TestCookieHeaders(headers)
188 def info(self) -> _TestCookieHeaders:
189 return self.headers
192class _TestCookieJar(CookieJar):
193 """A cookielib.CookieJar modified to inject and read cookie headers from
194 and to wsgi environments, and wsgi application responses.
195 """
197 def inject_wsgi(self, environ: "WSGIEnvironment") -> None:
198 """Inject the cookies as client headers into the server's wsgi
199 environment.
200 """
201 cvals = [f"{c.name}={c.value}" for c in self]
203 if cvals:
204 environ["HTTP_COOKIE"] = "; ".join(cvals)
205 else:
206 environ.pop("HTTP_COOKIE", None)
208 def extract_wsgi(
209 self,
210 environ: "WSGIEnvironment",
211 headers: t.Union[Headers, t.List[t.Tuple[str, str]]],
212 ) -> None:
213 """Extract the server's set-cookie headers as cookies into the
214 cookie jar.
215 """
216 self.extract_cookies(
217 _TestCookieResponse(headers), # type: ignore
218 _UrllibRequest(get_current_url(environ)),
219 )
222def _iter_data(data: t.Mapping[str, t.Any]) -> t.Iterator[t.Tuple[str, t.Any]]:
223 """Iterate over a mapping that might have a list of values, yielding
224 all key, value pairs. Almost like iter_multi_items but only allows
225 lists, not tuples, of values so tuples can be used for files.
226 """
227 if isinstance(data, MultiDict):
228 yield from data.items(multi=True)
229 else:
230 for key, value in data.items():
231 if isinstance(value, list):
232 for v in value:
233 yield key, v
234 else:
235 yield key, value
238_TAnyMultiDict = t.TypeVar("_TAnyMultiDict", bound=MultiDict)
241class EnvironBuilder:
242 """This class can be used to conveniently create a WSGI environment
243 for testing purposes. It can be used to quickly create WSGI environments
244 or request objects from arbitrary data.
246 The signature of this class is also used in some other places as of
247 Werkzeug 0.5 (:func:`create_environ`, :meth:`Response.from_values`,
248 :meth:`Client.open`). Because of this most of the functionality is
249 available through the constructor alone.
251 Files and regular form data can be manipulated independently of each
252 other with the :attr:`form` and :attr:`files` attributes, but are
253 passed with the same argument to the constructor: `data`.
255 `data` can be any of these values:
257 - a `str` or `bytes` object: The object is converted into an
258 :attr:`input_stream`, the :attr:`content_length` is set and you have to
259 provide a :attr:`content_type`.
260 - a `dict` or :class:`MultiDict`: The keys have to be strings. The values
261 have to be either any of the following objects, or a list of any of the
262 following objects:
264 - a :class:`file`-like object: These are converted into
265 :class:`FileStorage` objects automatically.
266 - a `tuple`: The :meth:`~FileMultiDict.add_file` method is called
267 with the key and the unpacked `tuple` items as positional
268 arguments.
269 - a `str`: The string is set as form data for the associated key.
270 - a file-like object: The object content is loaded in memory and then
271 handled like a regular `str` or a `bytes`.
273 :param path: the path of the request. In the WSGI environment this will
274 end up as `PATH_INFO`. If the `query_string` is not defined
275 and there is a question mark in the `path` everything after
276 it is used as query string.
277 :param base_url: the base URL is a URL that is used to extract the WSGI
278 URL scheme, host (server name + server port) and the
279 script root (`SCRIPT_NAME`).
280 :param query_string: an optional string or dict with URL parameters.
281 :param method: the HTTP method to use, defaults to `GET`.
282 :param input_stream: an optional input stream. Do not specify this and
283 `data`. As soon as an input stream is set you can't
284 modify :attr:`args` and :attr:`files` unless you
285 set the :attr:`input_stream` to `None` again.
286 :param content_type: The content type for the request. As of 0.5 you
287 don't have to provide this when specifying files
288 and form data via `data`.
289 :param content_length: The content length for the request. You don't
290 have to specify this when providing data via
291 `data`.
292 :param errors_stream: an optional error stream that is used for
293 `wsgi.errors`. Defaults to :data:`stderr`.
294 :param multithread: controls `wsgi.multithread`. Defaults to `False`.
295 :param multiprocess: controls `wsgi.multiprocess`. Defaults to `False`.
296 :param run_once: controls `wsgi.run_once`. Defaults to `False`.
297 :param headers: an optional list or :class:`Headers` object of headers.
298 :param data: a string or dict of form data or a file-object.
299 See explanation above.
300 :param json: An object to be serialized and assigned to ``data``.
301 Defaults the content type to ``"application/json"``.
302 Serialized with the function assigned to :attr:`json_dumps`.
303 :param environ_base: an optional dict of environment defaults.
304 :param environ_overrides: an optional dict of environment overrides.
305 :param charset: the charset used to encode string data.
306 :param auth: An authorization object to use for the
307 ``Authorization`` header value. A ``(username, password)`` tuple
308 is a shortcut for ``Basic`` authorization.
310 .. versionchanged:: 2.1
311 ``CONTENT_TYPE`` and ``CONTENT_LENGTH`` are not duplicated as
312 header keys in the environ.
314 .. versionchanged:: 2.0
315 ``REQUEST_URI`` and ``RAW_URI`` is the full raw URI including
316 the query string, not only the path.
318 .. versionchanged:: 2.0
319 The default :attr:`request_class` is ``Request`` instead of
320 ``BaseRequest``.
322 .. versionadded:: 2.0
323 Added the ``auth`` parameter.
325 .. versionadded:: 0.15
326 The ``json`` param and :meth:`json_dumps` method.
328 .. versionadded:: 0.15
329 The environ has keys ``REQUEST_URI`` and ``RAW_URI`` containing
330 the path before percent-decoding. This is not part of the WSGI
331 PEP, but many WSGI servers include it.
333 .. versionchanged:: 0.6
334 ``path`` and ``base_url`` can now be unicode strings that are
335 encoded with :func:`iri_to_uri`.
336 """
338 #: the server protocol to use. defaults to HTTP/1.1
339 server_protocol = "HTTP/1.1"
341 #: the wsgi version to use. defaults to (1, 0)
342 wsgi_version = (1, 0)
344 #: The default request class used by :meth:`get_request`.
345 request_class = Request
347 import json
349 #: The serialization function used when ``json`` is passed.
350 json_dumps = staticmethod(json.dumps)
351 del json
353 _args: t.Optional[MultiDict]
354 _query_string: t.Optional[str]
355 _input_stream: t.Optional[t.IO[bytes]]
356 _form: t.Optional[MultiDict]
357 _files: t.Optional[FileMultiDict]
359 def __init__(
360 self,
361 path: str = "/",
362 base_url: t.Optional[str] = None,
363 query_string: t.Optional[t.Union[t.Mapping[str, str], str]] = None,
364 method: str = "GET",
365 input_stream: t.Optional[t.IO[bytes]] = None,
366 content_type: t.Optional[str] = None,
367 content_length: t.Optional[int] = None,
368 errors_stream: t.Optional[t.IO[str]] = None,
369 multithread: bool = False,
370 multiprocess: bool = False,
371 run_once: bool = False,
372 headers: t.Optional[t.Union[Headers, t.Iterable[t.Tuple[str, str]]]] = None,
373 data: t.Optional[
374 t.Union[t.IO[bytes], str, bytes, t.Mapping[str, t.Any]]
375 ] = None,
376 environ_base: t.Optional[t.Mapping[str, t.Any]] = None,
377 environ_overrides: t.Optional[t.Mapping[str, t.Any]] = None,
378 charset: str = "utf-8",
379 mimetype: t.Optional[str] = None,
380 json: t.Optional[t.Mapping[str, t.Any]] = None,
381 auth: t.Optional[t.Union[Authorization, t.Tuple[str, str]]] = None,
382 ) -> None:
383 path_s = _make_encode_wrapper(path)
384 if query_string is not None and path_s("?") in path:
385 raise ValueError("Query string is defined in the path and as an argument")
386 request_uri = url_parse(path)
387 if query_string is None and path_s("?") in path:
388 query_string = request_uri.query
389 self.charset = charset
390 self.path = iri_to_uri(request_uri.path)
391 self.request_uri = path
392 if base_url is not None:
393 base_url = url_fix(iri_to_uri(base_url, charset), charset)
394 self.base_url = base_url # type: ignore
395 if isinstance(query_string, (bytes, str)):
396 self.query_string = query_string
397 else:
398 if query_string is None:
399 query_string = MultiDict()
400 elif not isinstance(query_string, MultiDict):
401 query_string = MultiDict(query_string)
402 self.args = query_string
403 self.method = method
404 if headers is None:
405 headers = Headers()
406 elif not isinstance(headers, Headers):
407 headers = Headers(headers)
408 self.headers = headers
409 if content_type is not None:
410 self.content_type = content_type
411 if errors_stream is None:
412 errors_stream = sys.stderr
413 self.errors_stream = errors_stream
414 self.multithread = multithread
415 self.multiprocess = multiprocess
416 self.run_once = run_once
417 self.environ_base = environ_base
418 self.environ_overrides = environ_overrides
419 self.input_stream = input_stream
420 self.content_length = content_length
421 self.closed = False
423 if auth is not None:
424 if isinstance(auth, tuple):
425 auth = Authorization(
426 "basic", {"username": auth[0], "password": auth[1]}
427 )
429 self.headers.set("Authorization", auth.to_header())
431 if json is not None:
432 if data is not None:
433 raise TypeError("can't provide both json and data")
435 data = self.json_dumps(json)
437 if self.content_type is None:
438 self.content_type = "application/json"
440 if data:
441 if input_stream is not None:
442 raise TypeError("can't provide input stream and data")
443 if hasattr(data, "read"):
444 data = data.read() # type: ignore
445 if isinstance(data, str):
446 data = data.encode(self.charset)
447 if isinstance(data, bytes):
448 self.input_stream = BytesIO(data)
449 if self.content_length is None:
450 self.content_length = len(data)
451 else:
452 for key, value in _iter_data(data): # type: ignore
453 if isinstance(value, (tuple, dict)) or hasattr(value, "read"):
454 self._add_file_from_data(key, value)
455 else:
456 self.form.setlistdefault(key).append(value)
458 if mimetype is not None:
459 self.mimetype = mimetype
461 @classmethod
462 def from_environ(
463 cls, environ: "WSGIEnvironment", **kwargs: t.Any
464 ) -> "EnvironBuilder":
465 """Turn an environ dict back into a builder. Any extra kwargs
466 override the args extracted from the environ.
468 .. versionchanged:: 2.0
469 Path and query values are passed through the WSGI decoding
470 dance to avoid double encoding.
472 .. versionadded:: 0.15
473 """
474 headers = Headers(EnvironHeaders(environ))
475 out = {
476 "path": _wsgi_decoding_dance(environ["PATH_INFO"]),
477 "base_url": cls._make_base_url(
478 environ["wsgi.url_scheme"],
479 headers.pop("Host"),
480 _wsgi_decoding_dance(environ["SCRIPT_NAME"]),
481 ),
482 "query_string": _wsgi_decoding_dance(environ["QUERY_STRING"]),
483 "method": environ["REQUEST_METHOD"],
484 "input_stream": environ["wsgi.input"],
485 "content_type": headers.pop("Content-Type", None),
486 "content_length": headers.pop("Content-Length", None),
487 "errors_stream": environ["wsgi.errors"],
488 "multithread": environ["wsgi.multithread"],
489 "multiprocess": environ["wsgi.multiprocess"],
490 "run_once": environ["wsgi.run_once"],
491 "headers": headers,
492 }
493 out.update(kwargs)
494 return cls(**out)
496 def _add_file_from_data(
497 self,
498 key: str,
499 value: t.Union[
500 t.IO[bytes], t.Tuple[t.IO[bytes], str], t.Tuple[t.IO[bytes], str, str]
501 ],
502 ) -> None:
503 """Called in the EnvironBuilder to add files from the data dict."""
504 if isinstance(value, tuple):
505 self.files.add_file(key, *value)
506 else:
507 self.files.add_file(key, value)
509 @staticmethod
510 def _make_base_url(scheme: str, host: str, script_root: str) -> str:
511 return url_unparse((scheme, host, script_root, "", "")).rstrip("/") + "/"
513 @property
514 def base_url(self) -> str:
515 """The base URL is used to extract the URL scheme, host name,
516 port, and root path.
517 """
518 return self._make_base_url(self.url_scheme, self.host, self.script_root)
520 @base_url.setter
521 def base_url(self, value: t.Optional[str]) -> None:
522 if value is None:
523 scheme = "http"
524 netloc = "localhost"
525 script_root = ""
526 else:
527 scheme, netloc, script_root, qs, anchor = url_parse(value)
528 if qs or anchor:
529 raise ValueError("base url must not contain a query string or fragment")
530 self.script_root = script_root.rstrip("/")
531 self.host = netloc
532 self.url_scheme = scheme
534 @property
535 def content_type(self) -> t.Optional[str]:
536 """The content type for the request. Reflected from and to
537 the :attr:`headers`. Do not set if you set :attr:`files` or
538 :attr:`form` for auto detection.
539 """
540 ct = self.headers.get("Content-Type")
541 if ct is None and not self._input_stream:
542 if self._files:
543 return "multipart/form-data"
544 if self._form:
545 return "application/x-www-form-urlencoded"
546 return None
547 return ct
549 @content_type.setter
550 def content_type(self, value: t.Optional[str]) -> None:
551 if value is None:
552 self.headers.pop("Content-Type", None)
553 else:
554 self.headers["Content-Type"] = value
556 @property
557 def mimetype(self) -> t.Optional[str]:
558 """The mimetype (content type without charset etc.)
560 .. versionadded:: 0.14
561 """
562 ct = self.content_type
563 return ct.split(";")[0].strip() if ct else None
565 @mimetype.setter
566 def mimetype(self, value: str) -> None:
567 self.content_type = get_content_type(value, self.charset)
569 @property
570 def mimetype_params(self) -> t.Mapping[str, str]:
571 """The mimetype parameters as dict. For example if the
572 content type is ``text/html; charset=utf-8`` the params would be
573 ``{'charset': 'utf-8'}``.
575 .. versionadded:: 0.14
576 """
578 def on_update(d: CallbackDict) -> None:
579 self.headers["Content-Type"] = dump_options_header(self.mimetype, d)
581 d = parse_options_header(self.headers.get("content-type", ""))[1]
582 return CallbackDict(d, on_update)
584 @property
585 def content_length(self) -> t.Optional[int]:
586 """The content length as integer. Reflected from and to the
587 :attr:`headers`. Do not set if you set :attr:`files` or
588 :attr:`form` for auto detection.
589 """
590 return self.headers.get("Content-Length", type=int)
592 @content_length.setter
593 def content_length(self, value: t.Optional[int]) -> None:
594 if value is None:
595 self.headers.pop("Content-Length", None)
596 else:
597 self.headers["Content-Length"] = str(value)
599 def _get_form(self, name: str, storage: t.Type[_TAnyMultiDict]) -> _TAnyMultiDict:
600 """Common behavior for getting the :attr:`form` and
601 :attr:`files` properties.
603 :param name: Name of the internal cached attribute.
604 :param storage: Storage class used for the data.
605 """
606 if self.input_stream is not None:
607 raise AttributeError("an input stream is defined")
609 rv = getattr(self, name)
611 if rv is None:
612 rv = storage()
613 setattr(self, name, rv)
615 return rv # type: ignore
617 def _set_form(self, name: str, value: MultiDict) -> None:
618 """Common behavior for setting the :attr:`form` and
619 :attr:`files` properties.
621 :param name: Name of the internal cached attribute.
622 :param value: Value to assign to the attribute.
623 """
624 self._input_stream = None
625 setattr(self, name, value)
627 @property
628 def form(self) -> MultiDict:
629 """A :class:`MultiDict` of form values."""
630 return self._get_form("_form", MultiDict)
632 @form.setter
633 def form(self, value: MultiDict) -> None:
634 self._set_form("_form", value)
636 @property
637 def files(self) -> FileMultiDict:
638 """A :class:`FileMultiDict` of uploaded files. Use
639 :meth:`~FileMultiDict.add_file` to add new files.
640 """
641 return self._get_form("_files", FileMultiDict)
643 @files.setter
644 def files(self, value: FileMultiDict) -> None:
645 self._set_form("_files", value)
647 @property
648 def input_stream(self) -> t.Optional[t.IO[bytes]]:
649 """An optional input stream. This is mutually exclusive with
650 setting :attr:`form` and :attr:`files`, setting it will clear
651 those. Do not provide this if the method is not ``POST`` or
652 another method that has a body.
653 """
654 return self._input_stream
656 @input_stream.setter
657 def input_stream(self, value: t.Optional[t.IO[bytes]]) -> None:
658 self._input_stream = value
659 self._form = None
660 self._files = None
662 @property
663 def query_string(self) -> str:
664 """The query string. If you set this to a string
665 :attr:`args` will no longer be available.
666 """
667 if self._query_string is None:
668 if self._args is not None:
669 return url_encode(self._args, charset=self.charset)
670 return ""
671 return self._query_string
673 @query_string.setter
674 def query_string(self, value: t.Optional[str]) -> None:
675 self._query_string = value
676 self._args = None
678 @property
679 def args(self) -> MultiDict:
680 """The URL arguments as :class:`MultiDict`."""
681 if self._query_string is not None:
682 raise AttributeError("a query string is defined")
683 if self._args is None:
684 self._args = MultiDict()
685 return self._args
687 @args.setter
688 def args(self, value: t.Optional[MultiDict]) -> None:
689 self._query_string = None
690 self._args = value
692 @property
693 def server_name(self) -> str:
694 """The server name (read-only, use :attr:`host` to set)"""
695 return self.host.split(":", 1)[0]
697 @property
698 def server_port(self) -> int:
699 """The server port as integer (read-only, use :attr:`host` to set)"""
700 pieces = self.host.split(":", 1)
701 if len(pieces) == 2 and pieces[1].isdigit():
702 return int(pieces[1])
703 if self.url_scheme == "https":
704 return 443
705 return 80
707 def __del__(self) -> None:
708 try:
709 self.close()
710 except Exception:
711 pass
713 def close(self) -> None:
714 """Closes all files. If you put real :class:`file` objects into the
715 :attr:`files` dict you can call this method to automatically close
716 them all in one go.
717 """
718 if self.closed:
719 return
720 try:
721 files = self.files.values()
722 except AttributeError:
723 files = () # type: ignore
724 for f in files:
725 try:
726 f.close()
727 except Exception:
728 pass
729 self.closed = True
731 def get_environ(self) -> "WSGIEnvironment":
732 """Return the built environ.
734 .. versionchanged:: 0.15
735 The content type and length headers are set based on
736 input stream detection. Previously this only set the WSGI
737 keys.
738 """
739 input_stream = self.input_stream
740 content_length = self.content_length
742 mimetype = self.mimetype
743 content_type = self.content_type
745 if input_stream is not None:
746 start_pos = input_stream.tell()
747 input_stream.seek(0, 2)
748 end_pos = input_stream.tell()
749 input_stream.seek(start_pos)
750 content_length = end_pos - start_pos
751 elif mimetype == "multipart/form-data":
752 input_stream, content_length, boundary = stream_encode_multipart(
753 CombinedMultiDict([self.form, self.files]), charset=self.charset
754 )
755 content_type = f'{mimetype}; boundary="{boundary}"'
756 elif mimetype == "application/x-www-form-urlencoded":
757 form_encoded = url_encode(self.form, charset=self.charset).encode("ascii")
758 content_length = len(form_encoded)
759 input_stream = BytesIO(form_encoded)
760 else:
761 input_stream = BytesIO()
763 result: "WSGIEnvironment" = {}
764 if self.environ_base:
765 result.update(self.environ_base)
767 def _path_encode(x: str) -> str:
768 return _wsgi_encoding_dance(url_unquote(x, self.charset), self.charset)
770 raw_uri = _wsgi_encoding_dance(self.request_uri, self.charset)
771 result.update(
772 {
773 "REQUEST_METHOD": self.method,
774 "SCRIPT_NAME": _path_encode(self.script_root),
775 "PATH_INFO": _path_encode(self.path),
776 "QUERY_STRING": _wsgi_encoding_dance(self.query_string, self.charset),
777 # Non-standard, added by mod_wsgi, uWSGI
778 "REQUEST_URI": raw_uri,
779 # Non-standard, added by gunicorn
780 "RAW_URI": raw_uri,
781 "SERVER_NAME": self.server_name,
782 "SERVER_PORT": str(self.server_port),
783 "HTTP_HOST": self.host,
784 "SERVER_PROTOCOL": self.server_protocol,
785 "wsgi.version": self.wsgi_version,
786 "wsgi.url_scheme": self.url_scheme,
787 "wsgi.input": input_stream,
788 "wsgi.errors": self.errors_stream,
789 "wsgi.multithread": self.multithread,
790 "wsgi.multiprocess": self.multiprocess,
791 "wsgi.run_once": self.run_once,
792 }
793 )
795 headers = self.headers.copy()
796 # Don't send these as headers, they're part of the environ.
797 headers.remove("Content-Type")
798 headers.remove("Content-Length")
800 if content_type is not None:
801 result["CONTENT_TYPE"] = content_type
803 if content_length is not None:
804 result["CONTENT_LENGTH"] = str(content_length)
806 combined_headers = defaultdict(list)
808 for key, value in headers.to_wsgi_list():
809 combined_headers[f"HTTP_{key.upper().replace('-', '_')}"].append(value)
811 for key, values in combined_headers.items():
812 result[key] = ", ".join(values)
814 if self.environ_overrides:
815 result.update(self.environ_overrides)
817 return result
819 def get_request(self, cls: t.Optional[t.Type[Request]] = None) -> Request:
820 """Returns a request with the data. If the request class is not
821 specified :attr:`request_class` is used.
823 :param cls: The request wrapper to use.
824 """
825 if cls is None:
826 cls = self.request_class
828 return cls(self.get_environ())
831class ClientRedirectError(Exception):
832 """If a redirect loop is detected when using follow_redirects=True with
833 the :cls:`Client`, then this exception is raised.
834 """
837class Client:
838 """This class allows you to send requests to a wrapped application.
840 The use_cookies parameter indicates whether cookies should be stored and
841 sent for subsequent requests. This is True by default, but passing False
842 will disable this behaviour.
844 If you want to request some subdomain of your application you may set
845 `allow_subdomain_redirects` to `True` as if not no external redirects
846 are allowed.
848 .. versionchanged:: 2.1
849 Removed deprecated behavior of treating the response as a
850 tuple. All data is available as properties on the returned
851 response object.
853 .. versionchanged:: 2.0
854 ``response_wrapper`` is always a subclass of
855 :class:``TestResponse``.
857 .. versionchanged:: 0.5
858 Added the ``use_cookies`` parameter.
859 """
861 def __init__(
862 self,
863 application: "WSGIApplication",
864 response_wrapper: t.Optional[t.Type["Response"]] = None,
865 use_cookies: bool = True,
866 allow_subdomain_redirects: bool = False,
867 ) -> None:
868 self.application = application
870 if response_wrapper in {None, Response}:
871 response_wrapper = TestResponse
872 elif not isinstance(response_wrapper, TestResponse):
873 response_wrapper = type(
874 "WrapperTestResponse",
875 (TestResponse, response_wrapper), # type: ignore
876 {},
877 )
879 self.response_wrapper = t.cast(t.Type["TestResponse"], response_wrapper)
881 if use_cookies:
882 self.cookie_jar: t.Optional[_TestCookieJar] = _TestCookieJar()
883 else:
884 self.cookie_jar = None
886 self.allow_subdomain_redirects = allow_subdomain_redirects
888 def set_cookie(
889 self,
890 server_name: str,
891 key: str,
892 value: str = "",
893 max_age: t.Optional[t.Union[timedelta, int]] = None,
894 expires: t.Optional[t.Union[str, datetime, int, float]] = None,
895 path: str = "/",
896 domain: t.Optional[str] = None,
897 secure: bool = False,
898 httponly: bool = False,
899 samesite: t.Optional[str] = None,
900 charset: str = "utf-8",
901 ) -> None:
902 """Sets a cookie in the client's cookie jar. The server name
903 is required and has to match the one that is also passed to
904 the open call.
905 """
906 assert self.cookie_jar is not None, "cookies disabled"
907 header = dump_cookie(
908 key,
909 value,
910 max_age,
911 expires,
912 path,
913 domain,
914 secure,
915 httponly,
916 charset,
917 samesite=samesite,
918 )
919 environ = create_environ(path, base_url=f"http://{server_name}")
920 headers = [("Set-Cookie", header)]
921 self.cookie_jar.extract_wsgi(environ, headers)
923 def delete_cookie(
924 self,
925 server_name: str,
926 key: str,
927 path: str = "/",
928 domain: t.Optional[str] = None,
929 secure: bool = False,
930 httponly: bool = False,
931 samesite: t.Optional[str] = None,
932 ) -> None:
933 """Deletes a cookie in the test client."""
934 self.set_cookie(
935 server_name,
936 key,
937 expires=0,
938 max_age=0,
939 path=path,
940 domain=domain,
941 secure=secure,
942 httponly=httponly,
943 samesite=samesite,
944 )
946 def run_wsgi_app(
947 self, environ: "WSGIEnvironment", buffered: bool = False
948 ) -> t.Tuple[t.Iterable[bytes], str, Headers]:
949 """Runs the wrapped WSGI app with the given environment.
951 :meta private:
952 """
953 if self.cookie_jar is not None:
954 self.cookie_jar.inject_wsgi(environ)
956 rv = run_wsgi_app(self.application, environ, buffered=buffered)
958 if self.cookie_jar is not None:
959 self.cookie_jar.extract_wsgi(environ, rv[2])
961 return rv
963 def resolve_redirect(
964 self, response: "TestResponse", buffered: bool = False
965 ) -> "TestResponse":
966 """Perform a new request to the location given by the redirect
967 response to the previous request.
969 :meta private:
970 """
971 scheme, netloc, path, qs, anchor = url_parse(response.location)
972 builder = EnvironBuilder.from_environ(
973 response.request.environ, path=path, query_string=qs
974 )
976 to_name_parts = netloc.split(":", 1)[0].split(".")
977 from_name_parts = builder.server_name.split(".")
979 if to_name_parts != [""]:
980 # The new location has a host, use it for the base URL.
981 builder.url_scheme = scheme
982 builder.host = netloc
983 else:
984 # A local redirect with autocorrect_location_header=False
985 # doesn't have a host, so use the request's host.
986 to_name_parts = from_name_parts
988 # Explain why a redirect to a different server name won't be followed.
989 if to_name_parts != from_name_parts:
990 if to_name_parts[-len(from_name_parts) :] == from_name_parts:
991 if not self.allow_subdomain_redirects:
992 raise RuntimeError("Following subdomain redirects is not enabled.")
993 else:
994 raise RuntimeError("Following external redirects is not supported.")
996 path_parts = path.split("/")
997 root_parts = builder.script_root.split("/")
999 if path_parts[: len(root_parts)] == root_parts:
1000 # Strip the script root from the path.
1001 builder.path = path[len(builder.script_root) :]
1002 else:
1003 # The new location is not under the script root, so use the
1004 # whole path and clear the previous root.
1005 builder.path = path
1006 builder.script_root = ""
1008 # Only 307 and 308 preserve all of the original request.
1009 if response.status_code not in {307, 308}:
1010 # HEAD is preserved, everything else becomes GET.
1011 if builder.method != "HEAD":
1012 builder.method = "GET"
1014 # Clear the body and the headers that describe it.
1016 if builder.input_stream is not None:
1017 builder.input_stream.close()
1018 builder.input_stream = None
1020 builder.content_type = None
1021 builder.content_length = None
1022 builder.headers.pop("Transfer-Encoding", None)
1024 return self.open(builder, buffered=buffered)
1026 def open(
1027 self,
1028 *args: t.Any,
1029 buffered: bool = False,
1030 follow_redirects: bool = False,
1031 **kwargs: t.Any,
1032 ) -> "TestResponse":
1033 """Generate an environ dict from the given arguments, make a
1034 request to the application using it, and return the response.
1036 :param args: Passed to :class:`EnvironBuilder` to create the
1037 environ for the request. If a single arg is passed, it can
1038 be an existing :class:`EnvironBuilder` or an environ dict.
1039 :param buffered: Convert the iterator returned by the app into
1040 a list. If the iterator has a ``close()`` method, it is
1041 called automatically.
1042 :param follow_redirects: Make additional requests to follow HTTP
1043 redirects until a non-redirect status is returned.
1044 :attr:`TestResponse.history` lists the intermediate
1045 responses.
1047 .. versionchanged:: 2.1
1048 Removed the ``as_tuple`` parameter.
1050 .. versionchanged:: 2.0
1051 ``as_tuple`` is deprecated and will be removed in Werkzeug
1052 2.1. Use :attr:`TestResponse.request` and
1053 ``request.environ`` instead.
1055 .. versionchanged:: 2.0
1056 The request input stream is closed when calling
1057 ``response.close()``. Input streams for redirects are
1058 automatically closed.
1060 .. versionchanged:: 0.5
1061 If a dict is provided as file in the dict for the ``data``
1062 parameter the content type has to be called ``content_type``
1063 instead of ``mimetype``. This change was made for
1064 consistency with :class:`werkzeug.FileWrapper`.
1066 .. versionchanged:: 0.5
1067 Added the ``follow_redirects`` parameter.
1068 """
1069 request: t.Optional["Request"] = None
1071 if not kwargs and len(args) == 1:
1072 arg = args[0]
1074 if isinstance(arg, EnvironBuilder):
1075 request = arg.get_request()
1076 elif isinstance(arg, dict):
1077 request = EnvironBuilder.from_environ(arg).get_request()
1078 elif isinstance(arg, Request):
1079 request = arg
1081 if request is None:
1082 builder = EnvironBuilder(*args, **kwargs)
1084 try:
1085 request = builder.get_request()
1086 finally:
1087 builder.close()
1089 response = self.run_wsgi_app(request.environ, buffered=buffered)
1090 response = self.response_wrapper(*response, request=request)
1092 redirects = set()
1093 history: t.List["TestResponse"] = []
1095 if not follow_redirects:
1096 return response
1098 while response.status_code in {
1099 301,
1100 302,
1101 303,
1102 305,
1103 307,
1104 308,
1105 }:
1106 # Exhaust intermediate response bodies to ensure middleware
1107 # that returns an iterator runs any cleanup code.
1108 if not buffered:
1109 response.make_sequence()
1110 response.close()
1112 new_redirect_entry = (response.location, response.status_code)
1114 if new_redirect_entry in redirects:
1115 raise ClientRedirectError(
1116 f"Loop detected: A {response.status_code} redirect"
1117 f" to {response.location} was already made."
1118 )
1120 redirects.add(new_redirect_entry)
1121 response.history = tuple(history)
1122 history.append(response)
1123 response = self.resolve_redirect(response, buffered=buffered)
1124 else:
1125 # This is the final request after redirects.
1126 response.history = tuple(history)
1127 # Close the input stream when closing the response, in case
1128 # the input is an open temporary file.
1129 response.call_on_close(request.input_stream.close)
1130 return response
1132 def get(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1133 """Call :meth:`open` with ``method`` set to ``GET``."""
1134 kw["method"] = "GET"
1135 return self.open(*args, **kw)
1137 def post(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1138 """Call :meth:`open` with ``method`` set to ``POST``."""
1139 kw["method"] = "POST"
1140 return self.open(*args, **kw)
1142 def put(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1143 """Call :meth:`open` with ``method`` set to ``PUT``."""
1144 kw["method"] = "PUT"
1145 return self.open(*args, **kw)
1147 def delete(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1148 """Call :meth:`open` with ``method`` set to ``DELETE``."""
1149 kw["method"] = "DELETE"
1150 return self.open(*args, **kw)
1152 def patch(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1153 """Call :meth:`open` with ``method`` set to ``PATCH``."""
1154 kw["method"] = "PATCH"
1155 return self.open(*args, **kw)
1157 def options(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1158 """Call :meth:`open` with ``method`` set to ``OPTIONS``."""
1159 kw["method"] = "OPTIONS"
1160 return self.open(*args, **kw)
1162 def head(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1163 """Call :meth:`open` with ``method`` set to ``HEAD``."""
1164 kw["method"] = "HEAD"
1165 return self.open(*args, **kw)
1167 def trace(self, *args: t.Any, **kw: t.Any) -> "TestResponse":
1168 """Call :meth:`open` with ``method`` set to ``TRACE``."""
1169 kw["method"] = "TRACE"
1170 return self.open(*args, **kw)
1172 def __repr__(self) -> str:
1173 return f"<{type(self).__name__} {self.application!r}>"
1176def create_environ(*args: t.Any, **kwargs: t.Any) -> "WSGIEnvironment":
1177 """Create a new WSGI environ dict based on the values passed. The first
1178 parameter should be the path of the request which defaults to '/'. The
1179 second one can either be an absolute path (in that case the host is
1180 localhost:80) or a full path to the request with scheme, netloc port and
1181 the path to the script.
1183 This accepts the same arguments as the :class:`EnvironBuilder`
1184 constructor.
1186 .. versionchanged:: 0.5
1187 This function is now a thin wrapper over :class:`EnvironBuilder` which
1188 was added in 0.5. The `headers`, `environ_base`, `environ_overrides`
1189 and `charset` parameters were added.
1190 """
1191 builder = EnvironBuilder(*args, **kwargs)
1193 try:
1194 return builder.get_environ()
1195 finally:
1196 builder.close()
1199def run_wsgi_app(
1200 app: "WSGIApplication", environ: "WSGIEnvironment", buffered: bool = False
1201) -> t.Tuple[t.Iterable[bytes], str, Headers]:
1202 """Return a tuple in the form (app_iter, status, headers) of the
1203 application output. This works best if you pass it an application that
1204 returns an iterator all the time.
1206 Sometimes applications may use the `write()` callable returned
1207 by the `start_response` function. This tries to resolve such edge
1208 cases automatically. But if you don't get the expected output you
1209 should set `buffered` to `True` which enforces buffering.
1211 If passed an invalid WSGI application the behavior of this function is
1212 undefined. Never pass non-conforming WSGI applications to this function.
1214 :param app: the application to execute.
1215 :param buffered: set to `True` to enforce buffering.
1216 :return: tuple in the form ``(app_iter, status, headers)``
1217 """
1218 # Copy environ to ensure any mutations by the app (ProxyFix, for
1219 # example) don't affect subsequent requests (such as redirects).
1220 environ = _get_environ(environ).copy()
1221 status: str
1222 response: t.Optional[t.Tuple[str, t.List[t.Tuple[str, str]]]] = None
1223 buffer: t.List[bytes] = []
1225 def start_response(status, headers, exc_info=None): # type: ignore
1226 nonlocal response
1228 if exc_info:
1229 try:
1230 raise exc_info[1].with_traceback(exc_info[2])
1231 finally:
1232 exc_info = None
1234 response = (status, headers)
1235 return buffer.append
1237 app_rv = app(environ, start_response)
1238 close_func = getattr(app_rv, "close", None)
1239 app_iter: t.Iterable[bytes] = iter(app_rv)
1241 # when buffering we emit the close call early and convert the
1242 # application iterator into a regular list
1243 if buffered:
1244 try:
1245 app_iter = list(app_iter)
1246 finally:
1247 if close_func is not None:
1248 close_func()
1250 # otherwise we iterate the application iter until we have a response, chain
1251 # the already received data with the already collected data and wrap it in
1252 # a new `ClosingIterator` if we need to restore a `close` callable from the
1253 # original return value.
1254 else:
1255 for item in app_iter:
1256 buffer.append(item)
1258 if response is not None:
1259 break
1261 if buffer:
1262 app_iter = chain(buffer, app_iter)
1264 if close_func is not None and app_iter is not app_rv:
1265 app_iter = ClosingIterator(app_iter, close_func)
1267 status, headers = response # type: ignore
1268 return app_iter, status, Headers(headers)
1271class TestResponse(Response):
1272 """:class:`~werkzeug.wrappers.Response` subclass that provides extra
1273 information about requests made with the test :class:`Client`.
1275 Test client requests will always return an instance of this class.
1276 If a custom response class is passed to the client, it is
1277 subclassed along with this to support test information.
1279 If the test request included large files, or if the application is
1280 serving a file, call :meth:`close` to close any open files and
1281 prevent Python showing a ``ResourceWarning``.
1283 .. versionchanged:: 2.1
1284 Removed deprecated behavior for treating the response instance
1285 as a tuple.
1287 .. versionadded:: 2.0
1288 Test client methods always return instances of this class.
1289 """
1291 request: Request
1292 """A request object with the environ used to make the request that
1293 resulted in this response.
1294 """
1296 history: t.Tuple["TestResponse", ...]
1297 """A list of intermediate responses. Populated when the test request
1298 is made with ``follow_redirects`` enabled.
1299 """
1301 # Tell Pytest to ignore this, it's not a test class.
1302 __test__ = False
1304 def __init__(
1305 self,
1306 response: t.Iterable[bytes],
1307 status: str,
1308 headers: Headers,
1309 request: Request,
1310 history: t.Tuple["TestResponse"] = (), # type: ignore
1311 **kwargs: t.Any,
1312 ) -> None:
1313 super().__init__(response, status, headers, **kwargs)
1314 self.request = request
1315 self.history = history
1316 self._compat_tuple = response, status, headers
1318 @cached_property
1319 def text(self) -> str:
1320 """The response data as text. A shortcut for
1321 ``response.get_data(as_text=True)``.
1323 .. versionadded:: 2.1
1324 """
1325 return self.get_data(as_text=True)