Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/test.py: 27%

572 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1import mimetypes 

2import sys 

3import typing as t 

4from collections import defaultdict 

5from datetime import datetime 

6from datetime import timedelta 

7from http.cookiejar import CookieJar 

8from io import BytesIO 

9from itertools import chain 

10from random import random 

11from tempfile import TemporaryFile 

12from time import time 

13from urllib.request import Request as _UrllibRequest 

14 

15from ._internal import _get_environ 

16from ._internal import _make_encode_wrapper 

17from ._internal import _wsgi_decoding_dance 

18from ._internal import _wsgi_encoding_dance 

19from .datastructures import Authorization 

20from .datastructures import CallbackDict 

21from .datastructures import CombinedMultiDict 

22from .datastructures import EnvironHeaders 

23from .datastructures import FileMultiDict 

24from .datastructures import Headers 

25from .datastructures import MultiDict 

26from .http import dump_cookie 

27from .http import dump_options_header 

28from .http import parse_options_header 

29from .sansio.multipart import Data 

30from .sansio.multipart import Epilogue 

31from .sansio.multipart import Field 

32from .sansio.multipart import File 

33from .sansio.multipart import MultipartEncoder 

34from .sansio.multipart import Preamble 

35from .urls import iri_to_uri 

36from .urls import url_encode 

37from .urls import url_fix 

38from .urls import url_parse 

39from .urls import url_unparse 

40from .urls import url_unquote 

41from .utils import cached_property 

42from .utils import get_content_type 

43from .wrappers.request import Request 

44from .wrappers.response import Response 

45from .wsgi import ClosingIterator 

46from .wsgi import get_current_url 

47 

48if t.TYPE_CHECKING: 

49 from _typeshed.wsgi import WSGIApplication 

50 from _typeshed.wsgi import WSGIEnvironment 

51 

52 

53def stream_encode_multipart( 

54 data: t.Mapping[str, t.Any], 

55 use_tempfile: bool = True, 

56 threshold: int = 1024 * 500, 

57 boundary: t.Optional[str] = None, 

58 charset: str = "utf-8", 

59) -> t.Tuple[t.IO[bytes], int, str]: 

60 """Encode a dict of values (either strings or file descriptors or 

61 :class:`FileStorage` objects.) into a multipart encoded string stored 

62 in a file descriptor. 

63 """ 

64 if boundary is None: 

65 boundary = f"---------------WerkzeugFormPart_{time()}{random()}" 

66 

67 stream: t.IO[bytes] = BytesIO() 

68 total_length = 0 

69 on_disk = False 

70 write_binary: t.Callable[[bytes], int] 

71 

72 if use_tempfile: 

73 

74 def write_binary(s: bytes) -> int: 

75 nonlocal stream, total_length, on_disk 

76 

77 if on_disk: 

78 return stream.write(s) 

79 else: 

80 length = len(s) 

81 

82 if length + total_length <= threshold: 

83 stream.write(s) 

84 else: 

85 new_stream = t.cast(t.IO[bytes], TemporaryFile("wb+")) 

86 new_stream.write(stream.getvalue()) # type: ignore 

87 new_stream.write(s) 

88 stream = new_stream 

89 on_disk = True 

90 

91 total_length += length 

92 return length 

93 

94 else: 

95 write_binary = stream.write 

96 

97 encoder = MultipartEncoder(boundary.encode()) 

98 write_binary(encoder.send_event(Preamble(data=b""))) 

99 for key, value in _iter_data(data): 

100 reader = getattr(value, "read", None) 

101 if reader is not None: 

102 filename = getattr(value, "filename", getattr(value, "name", None)) 

103 content_type = getattr(value, "content_type", None) 

104 if content_type is None: 

105 content_type = ( 

106 filename 

107 and mimetypes.guess_type(filename)[0] 

108 or "application/octet-stream" 

109 ) 

110 headers = Headers([("Content-Type", content_type)]) 

111 if filename is None: 

112 write_binary(encoder.send_event(Field(name=key, headers=headers))) 

113 else: 

114 write_binary( 

115 encoder.send_event( 

116 File(name=key, filename=filename, headers=headers) 

117 ) 

118 ) 

119 while True: 

120 chunk = reader(16384) 

121 

122 if not chunk: 

123 break 

124 

125 write_binary(encoder.send_event(Data(data=chunk, more_data=True))) 

126 else: 

127 if not isinstance(value, str): 

128 value = str(value) 

129 write_binary(encoder.send_event(Field(name=key, headers=Headers()))) 

130 write_binary( 

131 encoder.send_event(Data(data=value.encode(charset), more_data=False)) 

132 ) 

133 

134 write_binary(encoder.send_event(Epilogue(data=b""))) 

135 

136 length = stream.tell() 

137 stream.seek(0) 

138 return stream, length, boundary 

139 

140 

141def encode_multipart( 

142 values: t.Mapping[str, t.Any], 

143 boundary: t.Optional[str] = None, 

144 charset: str = "utf-8", 

145) -> t.Tuple[str, bytes]: 

146 """Like `stream_encode_multipart` but returns a tuple in the form 

147 (``boundary``, ``data``) where data is bytes. 

148 """ 

149 stream, length, boundary = stream_encode_multipart( 

150 values, use_tempfile=False, boundary=boundary, charset=charset 

151 ) 

152 return boundary, stream.read() 

153 

154 

155class _TestCookieHeaders: 

156 """A headers adapter for cookielib""" 

157 

158 def __init__(self, headers: t.Union[Headers, t.List[t.Tuple[str, str]]]) -> None: 

159 self.headers = headers 

160 

161 def getheaders(self, name: str) -> t.Iterable[str]: 

162 headers = [] 

163 name = name.lower() 

164 for k, v in self.headers: 

165 if k.lower() == name: 

166 headers.append(v) 

167 return headers 

168 

169 def get_all( 

170 self, name: str, default: t.Optional[t.Iterable[str]] = None 

171 ) -> t.Iterable[str]: 

172 headers = self.getheaders(name) 

173 

174 if not headers: 

175 return default # type: ignore 

176 

177 return headers 

178 

179 

180class _TestCookieResponse: 

181 """Something that looks like a httplib.HTTPResponse, but is actually just an 

182 adapter for our test responses to make them available for cookielib. 

183 """ 

184 

185 def __init__(self, headers: t.Union[Headers, t.List[t.Tuple[str, str]]]) -> None: 

186 self.headers = _TestCookieHeaders(headers) 

187 

188 def info(self) -> _TestCookieHeaders: 

189 return self.headers 

190 

191 

192class _TestCookieJar(CookieJar): 

193 """A cookielib.CookieJar modified to inject and read cookie headers from 

194 and to wsgi environments, and wsgi application responses. 

195 """ 

196 

197 def inject_wsgi(self, environ: "WSGIEnvironment") -> None: 

198 """Inject the cookies as client headers into the server's wsgi 

199 environment. 

200 """ 

201 cvals = [f"{c.name}={c.value}" for c in self] 

202 

203 if cvals: 

204 environ["HTTP_COOKIE"] = "; ".join(cvals) 

205 else: 

206 environ.pop("HTTP_COOKIE", None) 

207 

208 def extract_wsgi( 

209 self, 

210 environ: "WSGIEnvironment", 

211 headers: t.Union[Headers, t.List[t.Tuple[str, str]]], 

212 ) -> None: 

213 """Extract the server's set-cookie headers as cookies into the 

214 cookie jar. 

215 """ 

216 self.extract_cookies( 

217 _TestCookieResponse(headers), # type: ignore 

218 _UrllibRequest(get_current_url(environ)), 

219 ) 

220 

221 

222def _iter_data(data: t.Mapping[str, t.Any]) -> t.Iterator[t.Tuple[str, t.Any]]: 

223 """Iterate over a mapping that might have a list of values, yielding 

224 all key, value pairs. Almost like iter_multi_items but only allows 

225 lists, not tuples, of values so tuples can be used for files. 

226 """ 

227 if isinstance(data, MultiDict): 

228 yield from data.items(multi=True) 

229 else: 

230 for key, value in data.items(): 

231 if isinstance(value, list): 

232 for v in value: 

233 yield key, v 

234 else: 

235 yield key, value 

236 

237 

238_TAnyMultiDict = t.TypeVar("_TAnyMultiDict", bound=MultiDict) 

239 

240 

241class EnvironBuilder: 

242 """This class can be used to conveniently create a WSGI environment 

243 for testing purposes. It can be used to quickly create WSGI environments 

244 or request objects from arbitrary data. 

245 

246 The signature of this class is also used in some other places as of 

247 Werkzeug 0.5 (:func:`create_environ`, :meth:`Response.from_values`, 

248 :meth:`Client.open`). Because of this most of the functionality is 

249 available through the constructor alone. 

250 

251 Files and regular form data can be manipulated independently of each 

252 other with the :attr:`form` and :attr:`files` attributes, but are 

253 passed with the same argument to the constructor: `data`. 

254 

255 `data` can be any of these values: 

256 

257 - a `str` or `bytes` object: The object is converted into an 

258 :attr:`input_stream`, the :attr:`content_length` is set and you have to 

259 provide a :attr:`content_type`. 

260 - a `dict` or :class:`MultiDict`: The keys have to be strings. The values 

261 have to be either any of the following objects, or a list of any of the 

262 following objects: 

263 

264 - a :class:`file`-like object: These are converted into 

265 :class:`FileStorage` objects automatically. 

266 - a `tuple`: The :meth:`~FileMultiDict.add_file` method is called 

267 with the key and the unpacked `tuple` items as positional 

268 arguments. 

269 - a `str`: The string is set as form data for the associated key. 

270 - a file-like object: The object content is loaded in memory and then 

271 handled like a regular `str` or a `bytes`. 

272 

273 :param path: the path of the request. In the WSGI environment this will 

274 end up as `PATH_INFO`. If the `query_string` is not defined 

275 and there is a question mark in the `path` everything after 

276 it is used as query string. 

277 :param base_url: the base URL is a URL that is used to extract the WSGI 

278 URL scheme, host (server name + server port) and the 

279 script root (`SCRIPT_NAME`). 

280 :param query_string: an optional string or dict with URL parameters. 

281 :param method: the HTTP method to use, defaults to `GET`. 

282 :param input_stream: an optional input stream. Do not specify this and 

283 `data`. As soon as an input stream is set you can't 

284 modify :attr:`args` and :attr:`files` unless you 

285 set the :attr:`input_stream` to `None` again. 

286 :param content_type: The content type for the request. As of 0.5 you 

287 don't have to provide this when specifying files 

288 and form data via `data`. 

289 :param content_length: The content length for the request. You don't 

290 have to specify this when providing data via 

291 `data`. 

292 :param errors_stream: an optional error stream that is used for 

293 `wsgi.errors`. Defaults to :data:`stderr`. 

294 :param multithread: controls `wsgi.multithread`. Defaults to `False`. 

295 :param multiprocess: controls `wsgi.multiprocess`. Defaults to `False`. 

296 :param run_once: controls `wsgi.run_once`. Defaults to `False`. 

297 :param headers: an optional list or :class:`Headers` object of headers. 

298 :param data: a string or dict of form data or a file-object. 

299 See explanation above. 

300 :param json: An object to be serialized and assigned to ``data``. 

301 Defaults the content type to ``"application/json"``. 

302 Serialized with the function assigned to :attr:`json_dumps`. 

303 :param environ_base: an optional dict of environment defaults. 

304 :param environ_overrides: an optional dict of environment overrides. 

305 :param charset: the charset used to encode string data. 

306 :param auth: An authorization object to use for the 

307 ``Authorization`` header value. A ``(username, password)`` tuple 

308 is a shortcut for ``Basic`` authorization. 

309 

310 .. versionchanged:: 2.1 

311 ``CONTENT_TYPE`` and ``CONTENT_LENGTH`` are not duplicated as 

312 header keys in the environ. 

313 

314 .. versionchanged:: 2.0 

315 ``REQUEST_URI`` and ``RAW_URI`` is the full raw URI including 

316 the query string, not only the path. 

317 

318 .. versionchanged:: 2.0 

319 The default :attr:`request_class` is ``Request`` instead of 

320 ``BaseRequest``. 

321 

322 .. versionadded:: 2.0 

323 Added the ``auth`` parameter. 

324 

325 .. versionadded:: 0.15 

326 The ``json`` param and :meth:`json_dumps` method. 

327 

328 .. versionadded:: 0.15 

329 The environ has keys ``REQUEST_URI`` and ``RAW_URI`` containing 

330 the path before percent-decoding. This is not part of the WSGI 

331 PEP, but many WSGI servers include it. 

332 

333 .. versionchanged:: 0.6 

334 ``path`` and ``base_url`` can now be unicode strings that are 

335 encoded with :func:`iri_to_uri`. 

336 """ 

337 

338 #: the server protocol to use. defaults to HTTP/1.1 

339 server_protocol = "HTTP/1.1" 

340 

341 #: the wsgi version to use. defaults to (1, 0) 

342 wsgi_version = (1, 0) 

343 

344 #: The default request class used by :meth:`get_request`. 

345 request_class = Request 

346 

347 import json 

348 

349 #: The serialization function used when ``json`` is passed. 

350 json_dumps = staticmethod(json.dumps) 

351 del json 

352 

353 _args: t.Optional[MultiDict] 

354 _query_string: t.Optional[str] 

355 _input_stream: t.Optional[t.IO[bytes]] 

356 _form: t.Optional[MultiDict] 

357 _files: t.Optional[FileMultiDict] 

358 

359 def __init__( 

360 self, 

361 path: str = "/", 

362 base_url: t.Optional[str] = None, 

363 query_string: t.Optional[t.Union[t.Mapping[str, str], str]] = None, 

364 method: str = "GET", 

365 input_stream: t.Optional[t.IO[bytes]] = None, 

366 content_type: t.Optional[str] = None, 

367 content_length: t.Optional[int] = None, 

368 errors_stream: t.Optional[t.IO[str]] = None, 

369 multithread: bool = False, 

370 multiprocess: bool = False, 

371 run_once: bool = False, 

372 headers: t.Optional[t.Union[Headers, t.Iterable[t.Tuple[str, str]]]] = None, 

373 data: t.Optional[ 

374 t.Union[t.IO[bytes], str, bytes, t.Mapping[str, t.Any]] 

375 ] = None, 

376 environ_base: t.Optional[t.Mapping[str, t.Any]] = None, 

377 environ_overrides: t.Optional[t.Mapping[str, t.Any]] = None, 

378 charset: str = "utf-8", 

379 mimetype: t.Optional[str] = None, 

380 json: t.Optional[t.Mapping[str, t.Any]] = None, 

381 auth: t.Optional[t.Union[Authorization, t.Tuple[str, str]]] = None, 

382 ) -> None: 

383 path_s = _make_encode_wrapper(path) 

384 if query_string is not None and path_s("?") in path: 

385 raise ValueError("Query string is defined in the path and as an argument") 

386 request_uri = url_parse(path) 

387 if query_string is None and path_s("?") in path: 

388 query_string = request_uri.query 

389 self.charset = charset 

390 self.path = iri_to_uri(request_uri.path) 

391 self.request_uri = path 

392 if base_url is not None: 

393 base_url = url_fix(iri_to_uri(base_url, charset), charset) 

394 self.base_url = base_url # type: ignore 

395 if isinstance(query_string, (bytes, str)): 

396 self.query_string = query_string 

397 else: 

398 if query_string is None: 

399 query_string = MultiDict() 

400 elif not isinstance(query_string, MultiDict): 

401 query_string = MultiDict(query_string) 

402 self.args = query_string 

403 self.method = method 

404 if headers is None: 

405 headers = Headers() 

406 elif not isinstance(headers, Headers): 

407 headers = Headers(headers) 

408 self.headers = headers 

409 if content_type is not None: 

410 self.content_type = content_type 

411 if errors_stream is None: 

412 errors_stream = sys.stderr 

413 self.errors_stream = errors_stream 

414 self.multithread = multithread 

415 self.multiprocess = multiprocess 

416 self.run_once = run_once 

417 self.environ_base = environ_base 

418 self.environ_overrides = environ_overrides 

419 self.input_stream = input_stream 

420 self.content_length = content_length 

421 self.closed = False 

422 

423 if auth is not None: 

424 if isinstance(auth, tuple): 

425 auth = Authorization( 

426 "basic", {"username": auth[0], "password": auth[1]} 

427 ) 

428 

429 self.headers.set("Authorization", auth.to_header()) 

430 

431 if json is not None: 

432 if data is not None: 

433 raise TypeError("can't provide both json and data") 

434 

435 data = self.json_dumps(json) 

436 

437 if self.content_type is None: 

438 self.content_type = "application/json" 

439 

440 if data: 

441 if input_stream is not None: 

442 raise TypeError("can't provide input stream and data") 

443 if hasattr(data, "read"): 

444 data = data.read() # type: ignore 

445 if isinstance(data, str): 

446 data = data.encode(self.charset) 

447 if isinstance(data, bytes): 

448 self.input_stream = BytesIO(data) 

449 if self.content_length is None: 

450 self.content_length = len(data) 

451 else: 

452 for key, value in _iter_data(data): # type: ignore 

453 if isinstance(value, (tuple, dict)) or hasattr(value, "read"): 

454 self._add_file_from_data(key, value) 

455 else: 

456 self.form.setlistdefault(key).append(value) 

457 

458 if mimetype is not None: 

459 self.mimetype = mimetype 

460 

461 @classmethod 

462 def from_environ( 

463 cls, environ: "WSGIEnvironment", **kwargs: t.Any 

464 ) -> "EnvironBuilder": 

465 """Turn an environ dict back into a builder. Any extra kwargs 

466 override the args extracted from the environ. 

467 

468 .. versionchanged:: 2.0 

469 Path and query values are passed through the WSGI decoding 

470 dance to avoid double encoding. 

471 

472 .. versionadded:: 0.15 

473 """ 

474 headers = Headers(EnvironHeaders(environ)) 

475 out = { 

476 "path": _wsgi_decoding_dance(environ["PATH_INFO"]), 

477 "base_url": cls._make_base_url( 

478 environ["wsgi.url_scheme"], 

479 headers.pop("Host"), 

480 _wsgi_decoding_dance(environ["SCRIPT_NAME"]), 

481 ), 

482 "query_string": _wsgi_decoding_dance(environ["QUERY_STRING"]), 

483 "method": environ["REQUEST_METHOD"], 

484 "input_stream": environ["wsgi.input"], 

485 "content_type": headers.pop("Content-Type", None), 

486 "content_length": headers.pop("Content-Length", None), 

487 "errors_stream": environ["wsgi.errors"], 

488 "multithread": environ["wsgi.multithread"], 

489 "multiprocess": environ["wsgi.multiprocess"], 

490 "run_once": environ["wsgi.run_once"], 

491 "headers": headers, 

492 } 

493 out.update(kwargs) 

494 return cls(**out) 

495 

496 def _add_file_from_data( 

497 self, 

498 key: str, 

499 value: t.Union[ 

500 t.IO[bytes], t.Tuple[t.IO[bytes], str], t.Tuple[t.IO[bytes], str, str] 

501 ], 

502 ) -> None: 

503 """Called in the EnvironBuilder to add files from the data dict.""" 

504 if isinstance(value, tuple): 

505 self.files.add_file(key, *value) 

506 else: 

507 self.files.add_file(key, value) 

508 

509 @staticmethod 

510 def _make_base_url(scheme: str, host: str, script_root: str) -> str: 

511 return url_unparse((scheme, host, script_root, "", "")).rstrip("/") + "/" 

512 

513 @property 

514 def base_url(self) -> str: 

515 """The base URL is used to extract the URL scheme, host name, 

516 port, and root path. 

517 """ 

518 return self._make_base_url(self.url_scheme, self.host, self.script_root) 

519 

520 @base_url.setter 

521 def base_url(self, value: t.Optional[str]) -> None: 

522 if value is None: 

523 scheme = "http" 

524 netloc = "localhost" 

525 script_root = "" 

526 else: 

527 scheme, netloc, script_root, qs, anchor = url_parse(value) 

528 if qs or anchor: 

529 raise ValueError("base url must not contain a query string or fragment") 

530 self.script_root = script_root.rstrip("/") 

531 self.host = netloc 

532 self.url_scheme = scheme 

533 

534 @property 

535 def content_type(self) -> t.Optional[str]: 

536 """The content type for the request. Reflected from and to 

537 the :attr:`headers`. Do not set if you set :attr:`files` or 

538 :attr:`form` for auto detection. 

539 """ 

540 ct = self.headers.get("Content-Type") 

541 if ct is None and not self._input_stream: 

542 if self._files: 

543 return "multipart/form-data" 

544 if self._form: 

545 return "application/x-www-form-urlencoded" 

546 return None 

547 return ct 

548 

549 @content_type.setter 

550 def content_type(self, value: t.Optional[str]) -> None: 

551 if value is None: 

552 self.headers.pop("Content-Type", None) 

553 else: 

554 self.headers["Content-Type"] = value 

555 

556 @property 

557 def mimetype(self) -> t.Optional[str]: 

558 """The mimetype (content type without charset etc.) 

559 

560 .. versionadded:: 0.14 

561 """ 

562 ct = self.content_type 

563 return ct.split(";")[0].strip() if ct else None 

564 

565 @mimetype.setter 

566 def mimetype(self, value: str) -> None: 

567 self.content_type = get_content_type(value, self.charset) 

568 

569 @property 

570 def mimetype_params(self) -> t.Mapping[str, str]: 

571 """The mimetype parameters as dict. For example if the 

572 content type is ``text/html; charset=utf-8`` the params would be 

573 ``{'charset': 'utf-8'}``. 

574 

575 .. versionadded:: 0.14 

576 """ 

577 

578 def on_update(d: CallbackDict) -> None: 

579 self.headers["Content-Type"] = dump_options_header(self.mimetype, d) 

580 

581 d = parse_options_header(self.headers.get("content-type", ""))[1] 

582 return CallbackDict(d, on_update) 

583 

584 @property 

585 def content_length(self) -> t.Optional[int]: 

586 """The content length as integer. Reflected from and to the 

587 :attr:`headers`. Do not set if you set :attr:`files` or 

588 :attr:`form` for auto detection. 

589 """ 

590 return self.headers.get("Content-Length", type=int) 

591 

592 @content_length.setter 

593 def content_length(self, value: t.Optional[int]) -> None: 

594 if value is None: 

595 self.headers.pop("Content-Length", None) 

596 else: 

597 self.headers["Content-Length"] = str(value) 

598 

599 def _get_form(self, name: str, storage: t.Type[_TAnyMultiDict]) -> _TAnyMultiDict: 

600 """Common behavior for getting the :attr:`form` and 

601 :attr:`files` properties. 

602 

603 :param name: Name of the internal cached attribute. 

604 :param storage: Storage class used for the data. 

605 """ 

606 if self.input_stream is not None: 

607 raise AttributeError("an input stream is defined") 

608 

609 rv = getattr(self, name) 

610 

611 if rv is None: 

612 rv = storage() 

613 setattr(self, name, rv) 

614 

615 return rv # type: ignore 

616 

617 def _set_form(self, name: str, value: MultiDict) -> None: 

618 """Common behavior for setting the :attr:`form` and 

619 :attr:`files` properties. 

620 

621 :param name: Name of the internal cached attribute. 

622 :param value: Value to assign to the attribute. 

623 """ 

624 self._input_stream = None 

625 setattr(self, name, value) 

626 

627 @property 

628 def form(self) -> MultiDict: 

629 """A :class:`MultiDict` of form values.""" 

630 return self._get_form("_form", MultiDict) 

631 

632 @form.setter 

633 def form(self, value: MultiDict) -> None: 

634 self._set_form("_form", value) 

635 

636 @property 

637 def files(self) -> FileMultiDict: 

638 """A :class:`FileMultiDict` of uploaded files. Use 

639 :meth:`~FileMultiDict.add_file` to add new files. 

640 """ 

641 return self._get_form("_files", FileMultiDict) 

642 

643 @files.setter 

644 def files(self, value: FileMultiDict) -> None: 

645 self._set_form("_files", value) 

646 

647 @property 

648 def input_stream(self) -> t.Optional[t.IO[bytes]]: 

649 """An optional input stream. This is mutually exclusive with 

650 setting :attr:`form` and :attr:`files`, setting it will clear 

651 those. Do not provide this if the method is not ``POST`` or 

652 another method that has a body. 

653 """ 

654 return self._input_stream 

655 

656 @input_stream.setter 

657 def input_stream(self, value: t.Optional[t.IO[bytes]]) -> None: 

658 self._input_stream = value 

659 self._form = None 

660 self._files = None 

661 

662 @property 

663 def query_string(self) -> str: 

664 """The query string. If you set this to a string 

665 :attr:`args` will no longer be available. 

666 """ 

667 if self._query_string is None: 

668 if self._args is not None: 

669 return url_encode(self._args, charset=self.charset) 

670 return "" 

671 return self._query_string 

672 

673 @query_string.setter 

674 def query_string(self, value: t.Optional[str]) -> None: 

675 self._query_string = value 

676 self._args = None 

677 

678 @property 

679 def args(self) -> MultiDict: 

680 """The URL arguments as :class:`MultiDict`.""" 

681 if self._query_string is not None: 

682 raise AttributeError("a query string is defined") 

683 if self._args is None: 

684 self._args = MultiDict() 

685 return self._args 

686 

687 @args.setter 

688 def args(self, value: t.Optional[MultiDict]) -> None: 

689 self._query_string = None 

690 self._args = value 

691 

692 @property 

693 def server_name(self) -> str: 

694 """The server name (read-only, use :attr:`host` to set)""" 

695 return self.host.split(":", 1)[0] 

696 

697 @property 

698 def server_port(self) -> int: 

699 """The server port as integer (read-only, use :attr:`host` to set)""" 

700 pieces = self.host.split(":", 1) 

701 

702 if len(pieces) == 2: 

703 try: 

704 return int(pieces[1]) 

705 except ValueError: 

706 pass 

707 

708 if self.url_scheme == "https": 

709 return 443 

710 return 80 

711 

712 def __del__(self) -> None: 

713 try: 

714 self.close() 

715 except Exception: 

716 pass 

717 

718 def close(self) -> None: 

719 """Closes all files. If you put real :class:`file` objects into the 

720 :attr:`files` dict you can call this method to automatically close 

721 them all in one go. 

722 """ 

723 if self.closed: 

724 return 

725 try: 

726 files = self.files.values() 

727 except AttributeError: 

728 files = () # type: ignore 

729 for f in files: 

730 try: 

731 f.close() 

732 except Exception: 

733 pass 

734 self.closed = True 

735 

736 def get_environ(self) -> "WSGIEnvironment": 

737 """Return the built environ. 

738 

739 .. versionchanged:: 0.15 

740 The content type and length headers are set based on 

741 input stream detection. Previously this only set the WSGI 

742 keys. 

743 """ 

744 input_stream = self.input_stream 

745 content_length = self.content_length 

746 

747 mimetype = self.mimetype 

748 content_type = self.content_type 

749 

750 if input_stream is not None: 

751 start_pos = input_stream.tell() 

752 input_stream.seek(0, 2) 

753 end_pos = input_stream.tell() 

754 input_stream.seek(start_pos) 

755 content_length = end_pos - start_pos 

756 elif mimetype == "multipart/form-data": 

757 input_stream, content_length, boundary = stream_encode_multipart( 

758 CombinedMultiDict([self.form, self.files]), charset=self.charset 

759 ) 

760 content_type = f'{mimetype}; boundary="{boundary}"' 

761 elif mimetype == "application/x-www-form-urlencoded": 

762 form_encoded = url_encode(self.form, charset=self.charset).encode("ascii") 

763 content_length = len(form_encoded) 

764 input_stream = BytesIO(form_encoded) 

765 else: 

766 input_stream = BytesIO() 

767 

768 result: "WSGIEnvironment" = {} 

769 if self.environ_base: 

770 result.update(self.environ_base) 

771 

772 def _path_encode(x: str) -> str: 

773 return _wsgi_encoding_dance(url_unquote(x, self.charset), self.charset) 

774 

775 raw_uri = _wsgi_encoding_dance(self.request_uri, self.charset) 

776 result.update( 

777 { 

778 "REQUEST_METHOD": self.method, 

779 "SCRIPT_NAME": _path_encode(self.script_root), 

780 "PATH_INFO": _path_encode(self.path), 

781 "QUERY_STRING": _wsgi_encoding_dance(self.query_string, self.charset), 

782 # Non-standard, added by mod_wsgi, uWSGI 

783 "REQUEST_URI": raw_uri, 

784 # Non-standard, added by gunicorn 

785 "RAW_URI": raw_uri, 

786 "SERVER_NAME": self.server_name, 

787 "SERVER_PORT": str(self.server_port), 

788 "HTTP_HOST": self.host, 

789 "SERVER_PROTOCOL": self.server_protocol, 

790 "wsgi.version": self.wsgi_version, 

791 "wsgi.url_scheme": self.url_scheme, 

792 "wsgi.input": input_stream, 

793 "wsgi.errors": self.errors_stream, 

794 "wsgi.multithread": self.multithread, 

795 "wsgi.multiprocess": self.multiprocess, 

796 "wsgi.run_once": self.run_once, 

797 } 

798 ) 

799 

800 headers = self.headers.copy() 

801 # Don't send these as headers, they're part of the environ. 

802 headers.remove("Content-Type") 

803 headers.remove("Content-Length") 

804 

805 if content_type is not None: 

806 result["CONTENT_TYPE"] = content_type 

807 

808 if content_length is not None: 

809 result["CONTENT_LENGTH"] = str(content_length) 

810 

811 combined_headers = defaultdict(list) 

812 

813 for key, value in headers.to_wsgi_list(): 

814 combined_headers[f"HTTP_{key.upper().replace('-', '_')}"].append(value) 

815 

816 for key, values in combined_headers.items(): 

817 result[key] = ", ".join(values) 

818 

819 if self.environ_overrides: 

820 result.update(self.environ_overrides) 

821 

822 return result 

823 

824 def get_request(self, cls: t.Optional[t.Type[Request]] = None) -> Request: 

825 """Returns a request with the data. If the request class is not 

826 specified :attr:`request_class` is used. 

827 

828 :param cls: The request wrapper to use. 

829 """ 

830 if cls is None: 

831 cls = self.request_class 

832 

833 return cls(self.get_environ()) 

834 

835 

836class ClientRedirectError(Exception): 

837 """If a redirect loop is detected when using follow_redirects=True with 

838 the :cls:`Client`, then this exception is raised. 

839 """ 

840 

841 

842class Client: 

843 """This class allows you to send requests to a wrapped application. 

844 

845 The use_cookies parameter indicates whether cookies should be stored and 

846 sent for subsequent requests. This is True by default, but passing False 

847 will disable this behaviour. 

848 

849 If you want to request some subdomain of your application you may set 

850 `allow_subdomain_redirects` to `True` as if not no external redirects 

851 are allowed. 

852 

853 .. versionchanged:: 2.1 

854 Removed deprecated behavior of treating the response as a 

855 tuple. All data is available as properties on the returned 

856 response object. 

857 

858 .. versionchanged:: 2.0 

859 ``response_wrapper`` is always a subclass of 

860 :class:``TestResponse``. 

861 

862 .. versionchanged:: 0.5 

863 Added the ``use_cookies`` parameter. 

864 """ 

865 

866 def __init__( 

867 self, 

868 application: "WSGIApplication", 

869 response_wrapper: t.Optional[t.Type["Response"]] = None, 

870 use_cookies: bool = True, 

871 allow_subdomain_redirects: bool = False, 

872 ) -> None: 

873 self.application = application 

874 

875 if response_wrapper in {None, Response}: 

876 response_wrapper = TestResponse 

877 elif not isinstance(response_wrapper, TestResponse): 

878 response_wrapper = type( 

879 "WrapperTestResponse", 

880 (TestResponse, response_wrapper), # type: ignore 

881 {}, 

882 ) 

883 

884 self.response_wrapper = t.cast(t.Type["TestResponse"], response_wrapper) 

885 

886 if use_cookies: 

887 self.cookie_jar: t.Optional[_TestCookieJar] = _TestCookieJar() 

888 else: 

889 self.cookie_jar = None 

890 

891 self.allow_subdomain_redirects = allow_subdomain_redirects 

892 

893 def set_cookie( 

894 self, 

895 server_name: str, 

896 key: str, 

897 value: str = "", 

898 max_age: t.Optional[t.Union[timedelta, int]] = None, 

899 expires: t.Optional[t.Union[str, datetime, int, float]] = None, 

900 path: str = "/", 

901 domain: t.Optional[str] = None, 

902 secure: bool = False, 

903 httponly: bool = False, 

904 samesite: t.Optional[str] = None, 

905 charset: str = "utf-8", 

906 ) -> None: 

907 """Sets a cookie in the client's cookie jar. The server name 

908 is required and has to match the one that is also passed to 

909 the open call. 

910 """ 

911 assert self.cookie_jar is not None, "cookies disabled" 

912 header = dump_cookie( 

913 key, 

914 value, 

915 max_age, 

916 expires, 

917 path, 

918 domain, 

919 secure, 

920 httponly, 

921 charset, 

922 samesite=samesite, 

923 ) 

924 environ = create_environ(path, base_url=f"http://{server_name}") 

925 headers = [("Set-Cookie", header)] 

926 self.cookie_jar.extract_wsgi(environ, headers) 

927 

928 def delete_cookie( 

929 self, 

930 server_name: str, 

931 key: str, 

932 path: str = "/", 

933 domain: t.Optional[str] = None, 

934 secure: bool = False, 

935 httponly: bool = False, 

936 samesite: t.Optional[str] = None, 

937 ) -> None: 

938 """Deletes a cookie in the test client.""" 

939 self.set_cookie( 

940 server_name, 

941 key, 

942 expires=0, 

943 max_age=0, 

944 path=path, 

945 domain=domain, 

946 secure=secure, 

947 httponly=httponly, 

948 samesite=samesite, 

949 ) 

950 

951 def run_wsgi_app( 

952 self, environ: "WSGIEnvironment", buffered: bool = False 

953 ) -> t.Tuple[t.Iterable[bytes], str, Headers]: 

954 """Runs the wrapped WSGI app with the given environment. 

955 

956 :meta private: 

957 """ 

958 if self.cookie_jar is not None: 

959 self.cookie_jar.inject_wsgi(environ) 

960 

961 rv = run_wsgi_app(self.application, environ, buffered=buffered) 

962 

963 if self.cookie_jar is not None: 

964 self.cookie_jar.extract_wsgi(environ, rv[2]) 

965 

966 return rv 

967 

968 def resolve_redirect( 

969 self, response: "TestResponse", buffered: bool = False 

970 ) -> "TestResponse": 

971 """Perform a new request to the location given by the redirect 

972 response to the previous request. 

973 

974 :meta private: 

975 """ 

976 scheme, netloc, path, qs, anchor = url_parse(response.location) 

977 builder = EnvironBuilder.from_environ( 

978 response.request.environ, path=path, query_string=qs 

979 ) 

980 

981 to_name_parts = netloc.split(":", 1)[0].split(".") 

982 from_name_parts = builder.server_name.split(".") 

983 

984 if to_name_parts != [""]: 

985 # The new location has a host, use it for the base URL. 

986 builder.url_scheme = scheme 

987 builder.host = netloc 

988 else: 

989 # A local redirect with autocorrect_location_header=False 

990 # doesn't have a host, so use the request's host. 

991 to_name_parts = from_name_parts 

992 

993 # Explain why a redirect to a different server name won't be followed. 

994 if to_name_parts != from_name_parts: 

995 if to_name_parts[-len(from_name_parts) :] == from_name_parts: 

996 if not self.allow_subdomain_redirects: 

997 raise RuntimeError("Following subdomain redirects is not enabled.") 

998 else: 

999 raise RuntimeError("Following external redirects is not supported.") 

1000 

1001 path_parts = path.split("/") 

1002 root_parts = builder.script_root.split("/") 

1003 

1004 if path_parts[: len(root_parts)] == root_parts: 

1005 # Strip the script root from the path. 

1006 builder.path = path[len(builder.script_root) :] 

1007 else: 

1008 # The new location is not under the script root, so use the 

1009 # whole path and clear the previous root. 

1010 builder.path = path 

1011 builder.script_root = "" 

1012 

1013 # Only 307 and 308 preserve all of the original request. 

1014 if response.status_code not in {307, 308}: 

1015 # HEAD is preserved, everything else becomes GET. 

1016 if builder.method != "HEAD": 

1017 builder.method = "GET" 

1018 

1019 # Clear the body and the headers that describe it. 

1020 

1021 if builder.input_stream is not None: 

1022 builder.input_stream.close() 

1023 builder.input_stream = None 

1024 

1025 builder.content_type = None 

1026 builder.content_length = None 

1027 builder.headers.pop("Transfer-Encoding", None) 

1028 

1029 return self.open(builder, buffered=buffered) 

1030 

1031 def open( 

1032 self, 

1033 *args: t.Any, 

1034 buffered: bool = False, 

1035 follow_redirects: bool = False, 

1036 **kwargs: t.Any, 

1037 ) -> "TestResponse": 

1038 """Generate an environ dict from the given arguments, make a 

1039 request to the application using it, and return the response. 

1040 

1041 :param args: Passed to :class:`EnvironBuilder` to create the 

1042 environ for the request. If a single arg is passed, it can 

1043 be an existing :class:`EnvironBuilder` or an environ dict. 

1044 :param buffered: Convert the iterator returned by the app into 

1045 a list. If the iterator has a ``close()`` method, it is 

1046 called automatically. 

1047 :param follow_redirects: Make additional requests to follow HTTP 

1048 redirects until a non-redirect status is returned. 

1049 :attr:`TestResponse.history` lists the intermediate 

1050 responses. 

1051 

1052 .. versionchanged:: 2.1 

1053 Removed the ``as_tuple`` parameter. 

1054 

1055 .. versionchanged:: 2.0 

1056 ``as_tuple`` is deprecated and will be removed in Werkzeug 

1057 2.1. Use :attr:`TestResponse.request` and 

1058 ``request.environ`` instead. 

1059 

1060 .. versionchanged:: 2.0 

1061 The request input stream is closed when calling 

1062 ``response.close()``. Input streams for redirects are 

1063 automatically closed. 

1064 

1065 .. versionchanged:: 0.5 

1066 If a dict is provided as file in the dict for the ``data`` 

1067 parameter the content type has to be called ``content_type`` 

1068 instead of ``mimetype``. This change was made for 

1069 consistency with :class:`werkzeug.FileWrapper`. 

1070 

1071 .. versionchanged:: 0.5 

1072 Added the ``follow_redirects`` parameter. 

1073 """ 

1074 request: t.Optional["Request"] = None 

1075 

1076 if not kwargs and len(args) == 1: 

1077 arg = args[0] 

1078 

1079 if isinstance(arg, EnvironBuilder): 

1080 request = arg.get_request() 

1081 elif isinstance(arg, dict): 

1082 request = EnvironBuilder.from_environ(arg).get_request() 

1083 elif isinstance(arg, Request): 

1084 request = arg 

1085 

1086 if request is None: 

1087 builder = EnvironBuilder(*args, **kwargs) 

1088 

1089 try: 

1090 request = builder.get_request() 

1091 finally: 

1092 builder.close() 

1093 

1094 response = self.run_wsgi_app(request.environ, buffered=buffered) 

1095 response = self.response_wrapper(*response, request=request) 

1096 

1097 redirects = set() 

1098 history: t.List["TestResponse"] = [] 

1099 

1100 if not follow_redirects: 

1101 return response 

1102 

1103 while response.status_code in { 

1104 301, 

1105 302, 

1106 303, 

1107 305, 

1108 307, 

1109 308, 

1110 }: 

1111 # Exhaust intermediate response bodies to ensure middleware 

1112 # that returns an iterator runs any cleanup code. 

1113 if not buffered: 

1114 response.make_sequence() 

1115 response.close() 

1116 

1117 new_redirect_entry = (response.location, response.status_code) 

1118 

1119 if new_redirect_entry in redirects: 

1120 raise ClientRedirectError( 

1121 f"Loop detected: A {response.status_code} redirect" 

1122 f" to {response.location} was already made." 

1123 ) 

1124 

1125 redirects.add(new_redirect_entry) 

1126 response.history = tuple(history) 

1127 history.append(response) 

1128 response = self.resolve_redirect(response, buffered=buffered) 

1129 else: 

1130 # This is the final request after redirects. 

1131 response.history = tuple(history) 

1132 # Close the input stream when closing the response, in case 

1133 # the input is an open temporary file. 

1134 response.call_on_close(request.input_stream.close) 

1135 return response 

1136 

1137 def get(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1138 """Call :meth:`open` with ``method`` set to ``GET``.""" 

1139 kw["method"] = "GET" 

1140 return self.open(*args, **kw) 

1141 

1142 def post(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1143 """Call :meth:`open` with ``method`` set to ``POST``.""" 

1144 kw["method"] = "POST" 

1145 return self.open(*args, **kw) 

1146 

1147 def put(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1148 """Call :meth:`open` with ``method`` set to ``PUT``.""" 

1149 kw["method"] = "PUT" 

1150 return self.open(*args, **kw) 

1151 

1152 def delete(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1153 """Call :meth:`open` with ``method`` set to ``DELETE``.""" 

1154 kw["method"] = "DELETE" 

1155 return self.open(*args, **kw) 

1156 

1157 def patch(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1158 """Call :meth:`open` with ``method`` set to ``PATCH``.""" 

1159 kw["method"] = "PATCH" 

1160 return self.open(*args, **kw) 

1161 

1162 def options(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1163 """Call :meth:`open` with ``method`` set to ``OPTIONS``.""" 

1164 kw["method"] = "OPTIONS" 

1165 return self.open(*args, **kw) 

1166 

1167 def head(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1168 """Call :meth:`open` with ``method`` set to ``HEAD``.""" 

1169 kw["method"] = "HEAD" 

1170 return self.open(*args, **kw) 

1171 

1172 def trace(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1173 """Call :meth:`open` with ``method`` set to ``TRACE``.""" 

1174 kw["method"] = "TRACE" 

1175 return self.open(*args, **kw) 

1176 

1177 def __repr__(self) -> str: 

1178 return f"<{type(self).__name__} {self.application!r}>" 

1179 

1180 

1181def create_environ(*args: t.Any, **kwargs: t.Any) -> "WSGIEnvironment": 

1182 """Create a new WSGI environ dict based on the values passed. The first 

1183 parameter should be the path of the request which defaults to '/'. The 

1184 second one can either be an absolute path (in that case the host is 

1185 localhost:80) or a full path to the request with scheme, netloc port and 

1186 the path to the script. 

1187 

1188 This accepts the same arguments as the :class:`EnvironBuilder` 

1189 constructor. 

1190 

1191 .. versionchanged:: 0.5 

1192 This function is now a thin wrapper over :class:`EnvironBuilder` which 

1193 was added in 0.5. The `headers`, `environ_base`, `environ_overrides` 

1194 and `charset` parameters were added. 

1195 """ 

1196 builder = EnvironBuilder(*args, **kwargs) 

1197 

1198 try: 

1199 return builder.get_environ() 

1200 finally: 

1201 builder.close() 

1202 

1203 

1204def run_wsgi_app( 

1205 app: "WSGIApplication", environ: "WSGIEnvironment", buffered: bool = False 

1206) -> t.Tuple[t.Iterable[bytes], str, Headers]: 

1207 """Return a tuple in the form (app_iter, status, headers) of the 

1208 application output. This works best if you pass it an application that 

1209 returns an iterator all the time. 

1210 

1211 Sometimes applications may use the `write()` callable returned 

1212 by the `start_response` function. This tries to resolve such edge 

1213 cases automatically. But if you don't get the expected output you 

1214 should set `buffered` to `True` which enforces buffering. 

1215 

1216 If passed an invalid WSGI application the behavior of this function is 

1217 undefined. Never pass non-conforming WSGI applications to this function. 

1218 

1219 :param app: the application to execute. 

1220 :param buffered: set to `True` to enforce buffering. 

1221 :return: tuple in the form ``(app_iter, status, headers)`` 

1222 """ 

1223 # Copy environ to ensure any mutations by the app (ProxyFix, for 

1224 # example) don't affect subsequent requests (such as redirects). 

1225 environ = _get_environ(environ).copy() 

1226 status: str 

1227 response: t.Optional[t.Tuple[str, t.List[t.Tuple[str, str]]]] = None 

1228 buffer: t.List[bytes] = [] 

1229 

1230 def start_response(status, headers, exc_info=None): # type: ignore 

1231 nonlocal response 

1232 

1233 if exc_info: 

1234 try: 

1235 raise exc_info[1].with_traceback(exc_info[2]) 

1236 finally: 

1237 exc_info = None 

1238 

1239 response = (status, headers) 

1240 return buffer.append 

1241 

1242 app_rv = app(environ, start_response) 

1243 close_func = getattr(app_rv, "close", None) 

1244 app_iter: t.Iterable[bytes] = iter(app_rv) 

1245 

1246 # when buffering we emit the close call early and convert the 

1247 # application iterator into a regular list 

1248 if buffered: 

1249 try: 

1250 app_iter = list(app_iter) 

1251 finally: 

1252 if close_func is not None: 

1253 close_func() 

1254 

1255 # otherwise we iterate the application iter until we have a response, chain 

1256 # the already received data with the already collected data and wrap it in 

1257 # a new `ClosingIterator` if we need to restore a `close` callable from the 

1258 # original return value. 

1259 else: 

1260 for item in app_iter: 

1261 buffer.append(item) 

1262 

1263 if response is not None: 

1264 break 

1265 

1266 if buffer: 

1267 app_iter = chain(buffer, app_iter) 

1268 

1269 if close_func is not None and app_iter is not app_rv: 

1270 app_iter = ClosingIterator(app_iter, close_func) 

1271 

1272 status, headers = response # type: ignore 

1273 return app_iter, status, Headers(headers) 

1274 

1275 

1276class TestResponse(Response): 

1277 """:class:`~werkzeug.wrappers.Response` subclass that provides extra 

1278 information about requests made with the test :class:`Client`. 

1279 

1280 Test client requests will always return an instance of this class. 

1281 If a custom response class is passed to the client, it is 

1282 subclassed along with this to support test information. 

1283 

1284 If the test request included large files, or if the application is 

1285 serving a file, call :meth:`close` to close any open files and 

1286 prevent Python showing a ``ResourceWarning``. 

1287 

1288 .. versionchanged:: 2.2 

1289 Set the ``default_mimetype`` to None to prevent a mimetype being 

1290 assumed if missing. 

1291 

1292 .. versionchanged:: 2.1 

1293 Removed deprecated behavior for treating the response instance 

1294 as a tuple. 

1295 

1296 .. versionadded:: 2.0 

1297 Test client methods always return instances of this class. 

1298 """ 

1299 

1300 default_mimetype = None 

1301 # Don't assume a mimetype, instead use whatever the response provides 

1302 

1303 request: Request 

1304 """A request object with the environ used to make the request that 

1305 resulted in this response. 

1306 """ 

1307 

1308 history: t.Tuple["TestResponse", ...] 

1309 """A list of intermediate responses. Populated when the test request 

1310 is made with ``follow_redirects`` enabled. 

1311 """ 

1312 

1313 # Tell Pytest to ignore this, it's not a test class. 

1314 __test__ = False 

1315 

1316 def __init__( 

1317 self, 

1318 response: t.Iterable[bytes], 

1319 status: str, 

1320 headers: Headers, 

1321 request: Request, 

1322 history: t.Tuple["TestResponse"] = (), # type: ignore 

1323 **kwargs: t.Any, 

1324 ) -> None: 

1325 super().__init__(response, status, headers, **kwargs) 

1326 self.request = request 

1327 self.history = history 

1328 self._compat_tuple = response, status, headers 

1329 

1330 @cached_property 

1331 def text(self) -> str: 

1332 """The response data as text. A shortcut for 

1333 ``response.get_data(as_text=True)``. 

1334 

1335 .. versionadded:: 2.1 

1336 """ 

1337 return self.get_data(as_text=True)