Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/test.py: 46%

568 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:03 +0000

1import mimetypes 

2import sys 

3import typing as t 

4from collections import defaultdict 

5from datetime import datetime 

6from datetime import timedelta 

7from http.cookiejar import CookieJar 

8from io import BytesIO 

9from itertools import chain 

10from random import random 

11from tempfile import TemporaryFile 

12from time import time 

13from urllib.request import Request as _UrllibRequest 

14 

15from ._internal import _get_environ 

16from ._internal import _make_encode_wrapper 

17from ._internal import _wsgi_decoding_dance 

18from ._internal import _wsgi_encoding_dance 

19from .datastructures import Authorization 

20from .datastructures import CallbackDict 

21from .datastructures import CombinedMultiDict 

22from .datastructures import EnvironHeaders 

23from .datastructures import FileMultiDict 

24from .datastructures import Headers 

25from .datastructures import MultiDict 

26from .http import dump_cookie 

27from .http import dump_options_header 

28from .http import parse_options_header 

29from .sansio.multipart import Data 

30from .sansio.multipart import Epilogue 

31from .sansio.multipart import Field 

32from .sansio.multipart import File 

33from .sansio.multipart import MultipartEncoder 

34from .sansio.multipart import Preamble 

35from .urls import iri_to_uri 

36from .urls import url_encode 

37from .urls import url_fix 

38from .urls import url_parse 

39from .urls import url_unparse 

40from .urls import url_unquote 

41from .utils import cached_property 

42from .utils import get_content_type 

43from .wrappers.request import Request 

44from .wrappers.response import Response 

45from .wsgi import ClosingIterator 

46from .wsgi import get_current_url 

47 

48if t.TYPE_CHECKING: 

49 from _typeshed.wsgi import WSGIApplication 

50 from _typeshed.wsgi import WSGIEnvironment 

51 

52 

53def stream_encode_multipart( 

54 data: t.Mapping[str, t.Any], 

55 use_tempfile: bool = True, 

56 threshold: int = 1024 * 500, 

57 boundary: t.Optional[str] = None, 

58 charset: str = "utf-8", 

59) -> t.Tuple[t.IO[bytes], int, str]: 

60 """Encode a dict of values (either strings or file descriptors or 

61 :class:`FileStorage` objects.) into a multipart encoded string stored 

62 in a file descriptor. 

63 """ 

64 if boundary is None: 

65 boundary = f"---------------WerkzeugFormPart_{time()}{random()}" 

66 

67 stream: t.IO[bytes] = BytesIO() 

68 total_length = 0 

69 on_disk = False 

70 write_binary: t.Callable[[bytes], int] 

71 

72 if use_tempfile: 

73 

74 def write_binary(s: bytes) -> int: 

75 nonlocal stream, total_length, on_disk 

76 

77 if on_disk: 

78 return stream.write(s) 

79 else: 

80 length = len(s) 

81 

82 if length + total_length <= threshold: 

83 stream.write(s) 

84 else: 

85 new_stream = t.cast(t.IO[bytes], TemporaryFile("wb+")) 

86 new_stream.write(stream.getvalue()) # type: ignore 

87 new_stream.write(s) 

88 stream = new_stream 

89 on_disk = True 

90 

91 total_length += length 

92 return length 

93 

94 else: 

95 write_binary = stream.write 

96 

97 encoder = MultipartEncoder(boundary.encode()) 

98 write_binary(encoder.send_event(Preamble(data=b""))) 

99 for key, value in _iter_data(data): 

100 reader = getattr(value, "read", None) 

101 if reader is not None: 

102 filename = getattr(value, "filename", getattr(value, "name", None)) 

103 content_type = getattr(value, "content_type", None) 

104 if content_type is None: 

105 content_type = ( 

106 filename 

107 and mimetypes.guess_type(filename)[0] 

108 or "application/octet-stream" 

109 ) 

110 headers = Headers([("Content-Type", content_type)]) 

111 if filename is None: 

112 write_binary(encoder.send_event(Field(name=key, headers=headers))) 

113 else: 

114 write_binary( 

115 encoder.send_event( 

116 File(name=key, filename=filename, headers=headers) 

117 ) 

118 ) 

119 while True: 

120 chunk = reader(16384) 

121 

122 if not chunk: 

123 break 

124 

125 write_binary(encoder.send_event(Data(data=chunk, more_data=True))) 

126 else: 

127 if not isinstance(value, str): 

128 value = str(value) 

129 write_binary(encoder.send_event(Field(name=key, headers=Headers()))) 

130 write_binary( 

131 encoder.send_event(Data(data=value.encode(charset), more_data=False)) 

132 ) 

133 

134 write_binary(encoder.send_event(Epilogue(data=b""))) 

135 

136 length = stream.tell() 

137 stream.seek(0) 

138 return stream, length, boundary 

139 

140 

141def encode_multipart( 

142 values: t.Mapping[str, t.Any], 

143 boundary: t.Optional[str] = None, 

144 charset: str = "utf-8", 

145) -> t.Tuple[str, bytes]: 

146 """Like `stream_encode_multipart` but returns a tuple in the form 

147 (``boundary``, ``data``) where data is bytes. 

148 """ 

149 stream, length, boundary = stream_encode_multipart( 

150 values, use_tempfile=False, boundary=boundary, charset=charset 

151 ) 

152 return boundary, stream.read() 

153 

154 

155class _TestCookieHeaders: 

156 """A headers adapter for cookielib""" 

157 

158 def __init__(self, headers: t.Union[Headers, t.List[t.Tuple[str, str]]]) -> None: 

159 self.headers = headers 

160 

161 def getheaders(self, name: str) -> t.Iterable[str]: 

162 headers = [] 

163 name = name.lower() 

164 for k, v in self.headers: 

165 if k.lower() == name: 

166 headers.append(v) 

167 return headers 

168 

169 def get_all( 

170 self, name: str, default: t.Optional[t.Iterable[str]] = None 

171 ) -> t.Iterable[str]: 

172 headers = self.getheaders(name) 

173 

174 if not headers: 

175 return default # type: ignore 

176 

177 return headers 

178 

179 

180class _TestCookieResponse: 

181 """Something that looks like a httplib.HTTPResponse, but is actually just an 

182 adapter for our test responses to make them available for cookielib. 

183 """ 

184 

185 def __init__(self, headers: t.Union[Headers, t.List[t.Tuple[str, str]]]) -> None: 

186 self.headers = _TestCookieHeaders(headers) 

187 

188 def info(self) -> _TestCookieHeaders: 

189 return self.headers 

190 

191 

192class _TestCookieJar(CookieJar): 

193 """A cookielib.CookieJar modified to inject and read cookie headers from 

194 and to wsgi environments, and wsgi application responses. 

195 """ 

196 

197 def inject_wsgi(self, environ: "WSGIEnvironment") -> None: 

198 """Inject the cookies as client headers into the server's wsgi 

199 environment. 

200 """ 

201 cvals = [f"{c.name}={c.value}" for c in self] 

202 

203 if cvals: 

204 environ["HTTP_COOKIE"] = "; ".join(cvals) 

205 else: 

206 environ.pop("HTTP_COOKIE", None) 

207 

208 def extract_wsgi( 

209 self, 

210 environ: "WSGIEnvironment", 

211 headers: t.Union[Headers, t.List[t.Tuple[str, str]]], 

212 ) -> None: 

213 """Extract the server's set-cookie headers as cookies into the 

214 cookie jar. 

215 """ 

216 self.extract_cookies( 

217 _TestCookieResponse(headers), # type: ignore 

218 _UrllibRequest(get_current_url(environ)), 

219 ) 

220 

221 

222def _iter_data(data: t.Mapping[str, t.Any]) -> t.Iterator[t.Tuple[str, t.Any]]: 

223 """Iterate over a mapping that might have a list of values, yielding 

224 all key, value pairs. Almost like iter_multi_items but only allows 

225 lists, not tuples, of values so tuples can be used for files. 

226 """ 

227 if isinstance(data, MultiDict): 

228 yield from data.items(multi=True) 

229 else: 

230 for key, value in data.items(): 

231 if isinstance(value, list): 

232 for v in value: 

233 yield key, v 

234 else: 

235 yield key, value 

236 

237 

238_TAnyMultiDict = t.TypeVar("_TAnyMultiDict", bound=MultiDict) 

239 

240 

241class EnvironBuilder: 

242 """This class can be used to conveniently create a WSGI environment 

243 for testing purposes. It can be used to quickly create WSGI environments 

244 or request objects from arbitrary data. 

245 

246 The signature of this class is also used in some other places as of 

247 Werkzeug 0.5 (:func:`create_environ`, :meth:`Response.from_values`, 

248 :meth:`Client.open`). Because of this most of the functionality is 

249 available through the constructor alone. 

250 

251 Files and regular form data can be manipulated independently of each 

252 other with the :attr:`form` and :attr:`files` attributes, but are 

253 passed with the same argument to the constructor: `data`. 

254 

255 `data` can be any of these values: 

256 

257 - a `str` or `bytes` object: The object is converted into an 

258 :attr:`input_stream`, the :attr:`content_length` is set and you have to 

259 provide a :attr:`content_type`. 

260 - a `dict` or :class:`MultiDict`: The keys have to be strings. The values 

261 have to be either any of the following objects, or a list of any of the 

262 following objects: 

263 

264 - a :class:`file`-like object: These are converted into 

265 :class:`FileStorage` objects automatically. 

266 - a `tuple`: The :meth:`~FileMultiDict.add_file` method is called 

267 with the key and the unpacked `tuple` items as positional 

268 arguments. 

269 - a `str`: The string is set as form data for the associated key. 

270 - a file-like object: The object content is loaded in memory and then 

271 handled like a regular `str` or a `bytes`. 

272 

273 :param path: the path of the request. In the WSGI environment this will 

274 end up as `PATH_INFO`. If the `query_string` is not defined 

275 and there is a question mark in the `path` everything after 

276 it is used as query string. 

277 :param base_url: the base URL is a URL that is used to extract the WSGI 

278 URL scheme, host (server name + server port) and the 

279 script root (`SCRIPT_NAME`). 

280 :param query_string: an optional string or dict with URL parameters. 

281 :param method: the HTTP method to use, defaults to `GET`. 

282 :param input_stream: an optional input stream. Do not specify this and 

283 `data`. As soon as an input stream is set you can't 

284 modify :attr:`args` and :attr:`files` unless you 

285 set the :attr:`input_stream` to `None` again. 

286 :param content_type: The content type for the request. As of 0.5 you 

287 don't have to provide this when specifying files 

288 and form data via `data`. 

289 :param content_length: The content length for the request. You don't 

290 have to specify this when providing data via 

291 `data`. 

292 :param errors_stream: an optional error stream that is used for 

293 `wsgi.errors`. Defaults to :data:`stderr`. 

294 :param multithread: controls `wsgi.multithread`. Defaults to `False`. 

295 :param multiprocess: controls `wsgi.multiprocess`. Defaults to `False`. 

296 :param run_once: controls `wsgi.run_once`. Defaults to `False`. 

297 :param headers: an optional list or :class:`Headers` object of headers. 

298 :param data: a string or dict of form data or a file-object. 

299 See explanation above. 

300 :param json: An object to be serialized and assigned to ``data``. 

301 Defaults the content type to ``"application/json"``. 

302 Serialized with the function assigned to :attr:`json_dumps`. 

303 :param environ_base: an optional dict of environment defaults. 

304 :param environ_overrides: an optional dict of environment overrides. 

305 :param charset: the charset used to encode string data. 

306 :param auth: An authorization object to use for the 

307 ``Authorization`` header value. A ``(username, password)`` tuple 

308 is a shortcut for ``Basic`` authorization. 

309 

310 .. versionchanged:: 2.1 

311 ``CONTENT_TYPE`` and ``CONTENT_LENGTH`` are not duplicated as 

312 header keys in the environ. 

313 

314 .. versionchanged:: 2.0 

315 ``REQUEST_URI`` and ``RAW_URI`` is the full raw URI including 

316 the query string, not only the path. 

317 

318 .. versionchanged:: 2.0 

319 The default :attr:`request_class` is ``Request`` instead of 

320 ``BaseRequest``. 

321 

322 .. versionadded:: 2.0 

323 Added the ``auth`` parameter. 

324 

325 .. versionadded:: 0.15 

326 The ``json`` param and :meth:`json_dumps` method. 

327 

328 .. versionadded:: 0.15 

329 The environ has keys ``REQUEST_URI`` and ``RAW_URI`` containing 

330 the path before percent-decoding. This is not part of the WSGI 

331 PEP, but many WSGI servers include it. 

332 

333 .. versionchanged:: 0.6 

334 ``path`` and ``base_url`` can now be unicode strings that are 

335 encoded with :func:`iri_to_uri`. 

336 """ 

337 

338 #: the server protocol to use. defaults to HTTP/1.1 

339 server_protocol = "HTTP/1.1" 

340 

341 #: the wsgi version to use. defaults to (1, 0) 

342 wsgi_version = (1, 0) 

343 

344 #: The default request class used by :meth:`get_request`. 

345 request_class = Request 

346 

347 import json 

348 

349 #: The serialization function used when ``json`` is passed. 

350 json_dumps = staticmethod(json.dumps) 

351 del json 

352 

353 _args: t.Optional[MultiDict] 

354 _query_string: t.Optional[str] 

355 _input_stream: t.Optional[t.IO[bytes]] 

356 _form: t.Optional[MultiDict] 

357 _files: t.Optional[FileMultiDict] 

358 

359 def __init__( 

360 self, 

361 path: str = "/", 

362 base_url: t.Optional[str] = None, 

363 query_string: t.Optional[t.Union[t.Mapping[str, str], str]] = None, 

364 method: str = "GET", 

365 input_stream: t.Optional[t.IO[bytes]] = None, 

366 content_type: t.Optional[str] = None, 

367 content_length: t.Optional[int] = None, 

368 errors_stream: t.Optional[t.IO[str]] = None, 

369 multithread: bool = False, 

370 multiprocess: bool = False, 

371 run_once: bool = False, 

372 headers: t.Optional[t.Union[Headers, t.Iterable[t.Tuple[str, str]]]] = None, 

373 data: t.Optional[ 

374 t.Union[t.IO[bytes], str, bytes, t.Mapping[str, t.Any]] 

375 ] = None, 

376 environ_base: t.Optional[t.Mapping[str, t.Any]] = None, 

377 environ_overrides: t.Optional[t.Mapping[str, t.Any]] = None, 

378 charset: str = "utf-8", 

379 mimetype: t.Optional[str] = None, 

380 json: t.Optional[t.Mapping[str, t.Any]] = None, 

381 auth: t.Optional[t.Union[Authorization, t.Tuple[str, str]]] = None, 

382 ) -> None: 

383 path_s = _make_encode_wrapper(path) 

384 if query_string is not None and path_s("?") in path: 

385 raise ValueError("Query string is defined in the path and as an argument") 

386 request_uri = url_parse(path) 

387 if query_string is None and path_s("?") in path: 

388 query_string = request_uri.query 

389 self.charset = charset 

390 self.path = iri_to_uri(request_uri.path) 

391 self.request_uri = path 

392 if base_url is not None: 

393 base_url = url_fix(iri_to_uri(base_url, charset), charset) 

394 self.base_url = base_url # type: ignore 

395 if isinstance(query_string, (bytes, str)): 

396 self.query_string = query_string 

397 else: 

398 if query_string is None: 

399 query_string = MultiDict() 

400 elif not isinstance(query_string, MultiDict): 

401 query_string = MultiDict(query_string) 

402 self.args = query_string 

403 self.method = method 

404 if headers is None: 

405 headers = Headers() 

406 elif not isinstance(headers, Headers): 

407 headers = Headers(headers) 

408 self.headers = headers 

409 if content_type is not None: 

410 self.content_type = content_type 

411 if errors_stream is None: 

412 errors_stream = sys.stderr 

413 self.errors_stream = errors_stream 

414 self.multithread = multithread 

415 self.multiprocess = multiprocess 

416 self.run_once = run_once 

417 self.environ_base = environ_base 

418 self.environ_overrides = environ_overrides 

419 self.input_stream = input_stream 

420 self.content_length = content_length 

421 self.closed = False 

422 

423 if auth is not None: 

424 if isinstance(auth, tuple): 

425 auth = Authorization( 

426 "basic", {"username": auth[0], "password": auth[1]} 

427 ) 

428 

429 self.headers.set("Authorization", auth.to_header()) 

430 

431 if json is not None: 

432 if data is not None: 

433 raise TypeError("can't provide both json and data") 

434 

435 data = self.json_dumps(json) 

436 

437 if self.content_type is None: 

438 self.content_type = "application/json" 

439 

440 if data: 

441 if input_stream is not None: 

442 raise TypeError("can't provide input stream and data") 

443 if hasattr(data, "read"): 

444 data = data.read() # type: ignore 

445 if isinstance(data, str): 

446 data = data.encode(self.charset) 

447 if isinstance(data, bytes): 

448 self.input_stream = BytesIO(data) 

449 if self.content_length is None: 

450 self.content_length = len(data) 

451 else: 

452 for key, value in _iter_data(data): # type: ignore 

453 if isinstance(value, (tuple, dict)) or hasattr(value, "read"): 

454 self._add_file_from_data(key, value) 

455 else: 

456 self.form.setlistdefault(key).append(value) 

457 

458 if mimetype is not None: 

459 self.mimetype = mimetype 

460 

461 @classmethod 

462 def from_environ( 

463 cls, environ: "WSGIEnvironment", **kwargs: t.Any 

464 ) -> "EnvironBuilder": 

465 """Turn an environ dict back into a builder. Any extra kwargs 

466 override the args extracted from the environ. 

467 

468 .. versionchanged:: 2.0 

469 Path and query values are passed through the WSGI decoding 

470 dance to avoid double encoding. 

471 

472 .. versionadded:: 0.15 

473 """ 

474 headers = Headers(EnvironHeaders(environ)) 

475 out = { 

476 "path": _wsgi_decoding_dance(environ["PATH_INFO"]), 

477 "base_url": cls._make_base_url( 

478 environ["wsgi.url_scheme"], 

479 headers.pop("Host"), 

480 _wsgi_decoding_dance(environ["SCRIPT_NAME"]), 

481 ), 

482 "query_string": _wsgi_decoding_dance(environ["QUERY_STRING"]), 

483 "method": environ["REQUEST_METHOD"], 

484 "input_stream": environ["wsgi.input"], 

485 "content_type": headers.pop("Content-Type", None), 

486 "content_length": headers.pop("Content-Length", None), 

487 "errors_stream": environ["wsgi.errors"], 

488 "multithread": environ["wsgi.multithread"], 

489 "multiprocess": environ["wsgi.multiprocess"], 

490 "run_once": environ["wsgi.run_once"], 

491 "headers": headers, 

492 } 

493 out.update(kwargs) 

494 return cls(**out) 

495 

496 def _add_file_from_data( 

497 self, 

498 key: str, 

499 value: t.Union[ 

500 t.IO[bytes], t.Tuple[t.IO[bytes], str], t.Tuple[t.IO[bytes], str, str] 

501 ], 

502 ) -> None: 

503 """Called in the EnvironBuilder to add files from the data dict.""" 

504 if isinstance(value, tuple): 

505 self.files.add_file(key, *value) 

506 else: 

507 self.files.add_file(key, value) 

508 

509 @staticmethod 

510 def _make_base_url(scheme: str, host: str, script_root: str) -> str: 

511 return url_unparse((scheme, host, script_root, "", "")).rstrip("/") + "/" 

512 

513 @property 

514 def base_url(self) -> str: 

515 """The base URL is used to extract the URL scheme, host name, 

516 port, and root path. 

517 """ 

518 return self._make_base_url(self.url_scheme, self.host, self.script_root) 

519 

520 @base_url.setter 

521 def base_url(self, value: t.Optional[str]) -> None: 

522 if value is None: 

523 scheme = "http" 

524 netloc = "localhost" 

525 script_root = "" 

526 else: 

527 scheme, netloc, script_root, qs, anchor = url_parse(value) 

528 if qs or anchor: 

529 raise ValueError("base url must not contain a query string or fragment") 

530 self.script_root = script_root.rstrip("/") 

531 self.host = netloc 

532 self.url_scheme = scheme 

533 

534 @property 

535 def content_type(self) -> t.Optional[str]: 

536 """The content type for the request. Reflected from and to 

537 the :attr:`headers`. Do not set if you set :attr:`files` or 

538 :attr:`form` for auto detection. 

539 """ 

540 ct = self.headers.get("Content-Type") 

541 if ct is None and not self._input_stream: 

542 if self._files: 

543 return "multipart/form-data" 

544 if self._form: 

545 return "application/x-www-form-urlencoded" 

546 return None 

547 return ct 

548 

549 @content_type.setter 

550 def content_type(self, value: t.Optional[str]) -> None: 

551 if value is None: 

552 self.headers.pop("Content-Type", None) 

553 else: 

554 self.headers["Content-Type"] = value 

555 

556 @property 

557 def mimetype(self) -> t.Optional[str]: 

558 """The mimetype (content type without charset etc.) 

559 

560 .. versionadded:: 0.14 

561 """ 

562 ct = self.content_type 

563 return ct.split(";")[0].strip() if ct else None 

564 

565 @mimetype.setter 

566 def mimetype(self, value: str) -> None: 

567 self.content_type = get_content_type(value, self.charset) 

568 

569 @property 

570 def mimetype_params(self) -> t.Mapping[str, str]: 

571 """The mimetype parameters as dict. For example if the 

572 content type is ``text/html; charset=utf-8`` the params would be 

573 ``{'charset': 'utf-8'}``. 

574 

575 .. versionadded:: 0.14 

576 """ 

577 

578 def on_update(d: CallbackDict) -> None: 

579 self.headers["Content-Type"] = dump_options_header(self.mimetype, d) 

580 

581 d = parse_options_header(self.headers.get("content-type", ""))[1] 

582 return CallbackDict(d, on_update) 

583 

584 @property 

585 def content_length(self) -> t.Optional[int]: 

586 """The content length as integer. Reflected from and to the 

587 :attr:`headers`. Do not set if you set :attr:`files` or 

588 :attr:`form` for auto detection. 

589 """ 

590 return self.headers.get("Content-Length", type=int) 

591 

592 @content_length.setter 

593 def content_length(self, value: t.Optional[int]) -> None: 

594 if value is None: 

595 self.headers.pop("Content-Length", None) 

596 else: 

597 self.headers["Content-Length"] = str(value) 

598 

599 def _get_form(self, name: str, storage: t.Type[_TAnyMultiDict]) -> _TAnyMultiDict: 

600 """Common behavior for getting the :attr:`form` and 

601 :attr:`files` properties. 

602 

603 :param name: Name of the internal cached attribute. 

604 :param storage: Storage class used for the data. 

605 """ 

606 if self.input_stream is not None: 

607 raise AttributeError("an input stream is defined") 

608 

609 rv = getattr(self, name) 

610 

611 if rv is None: 

612 rv = storage() 

613 setattr(self, name, rv) 

614 

615 return rv # type: ignore 

616 

617 def _set_form(self, name: str, value: MultiDict) -> None: 

618 """Common behavior for setting the :attr:`form` and 

619 :attr:`files` properties. 

620 

621 :param name: Name of the internal cached attribute. 

622 :param value: Value to assign to the attribute. 

623 """ 

624 self._input_stream = None 

625 setattr(self, name, value) 

626 

627 @property 

628 def form(self) -> MultiDict: 

629 """A :class:`MultiDict` of form values.""" 

630 return self._get_form("_form", MultiDict) 

631 

632 @form.setter 

633 def form(self, value: MultiDict) -> None: 

634 self._set_form("_form", value) 

635 

636 @property 

637 def files(self) -> FileMultiDict: 

638 """A :class:`FileMultiDict` of uploaded files. Use 

639 :meth:`~FileMultiDict.add_file` to add new files. 

640 """ 

641 return self._get_form("_files", FileMultiDict) 

642 

643 @files.setter 

644 def files(self, value: FileMultiDict) -> None: 

645 self._set_form("_files", value) 

646 

647 @property 

648 def input_stream(self) -> t.Optional[t.IO[bytes]]: 

649 """An optional input stream. This is mutually exclusive with 

650 setting :attr:`form` and :attr:`files`, setting it will clear 

651 those. Do not provide this if the method is not ``POST`` or 

652 another method that has a body. 

653 """ 

654 return self._input_stream 

655 

656 @input_stream.setter 

657 def input_stream(self, value: t.Optional[t.IO[bytes]]) -> None: 

658 self._input_stream = value 

659 self._form = None 

660 self._files = None 

661 

662 @property 

663 def query_string(self) -> str: 

664 """The query string. If you set this to a string 

665 :attr:`args` will no longer be available. 

666 """ 

667 if self._query_string is None: 

668 if self._args is not None: 

669 return url_encode(self._args, charset=self.charset) 

670 return "" 

671 return self._query_string 

672 

673 @query_string.setter 

674 def query_string(self, value: t.Optional[str]) -> None: 

675 self._query_string = value 

676 self._args = None 

677 

678 @property 

679 def args(self) -> MultiDict: 

680 """The URL arguments as :class:`MultiDict`.""" 

681 if self._query_string is not None: 

682 raise AttributeError("a query string is defined") 

683 if self._args is None: 

684 self._args = MultiDict() 

685 return self._args 

686 

687 @args.setter 

688 def args(self, value: t.Optional[MultiDict]) -> None: 

689 self._query_string = None 

690 self._args = value 

691 

692 @property 

693 def server_name(self) -> str: 

694 """The server name (read-only, use :attr:`host` to set)""" 

695 return self.host.split(":", 1)[0] 

696 

697 @property 

698 def server_port(self) -> int: 

699 """The server port as integer (read-only, use :attr:`host` to set)""" 

700 pieces = self.host.split(":", 1) 

701 if len(pieces) == 2 and pieces[1].isdigit(): 

702 return int(pieces[1]) 

703 if self.url_scheme == "https": 

704 return 443 

705 return 80 

706 

707 def __del__(self) -> None: 

708 try: 

709 self.close() 

710 except Exception: 

711 pass 

712 

713 def close(self) -> None: 

714 """Closes all files. If you put real :class:`file` objects into the 

715 :attr:`files` dict you can call this method to automatically close 

716 them all in one go. 

717 """ 

718 if self.closed: 

719 return 

720 try: 

721 files = self.files.values() 

722 except AttributeError: 

723 files = () # type: ignore 

724 for f in files: 

725 try: 

726 f.close() 

727 except Exception: 

728 pass 

729 self.closed = True 

730 

731 def get_environ(self) -> "WSGIEnvironment": 

732 """Return the built environ. 

733 

734 .. versionchanged:: 0.15 

735 The content type and length headers are set based on 

736 input stream detection. Previously this only set the WSGI 

737 keys. 

738 """ 

739 input_stream = self.input_stream 

740 content_length = self.content_length 

741 

742 mimetype = self.mimetype 

743 content_type = self.content_type 

744 

745 if input_stream is not None: 

746 start_pos = input_stream.tell() 

747 input_stream.seek(0, 2) 

748 end_pos = input_stream.tell() 

749 input_stream.seek(start_pos) 

750 content_length = end_pos - start_pos 

751 elif mimetype == "multipart/form-data": 

752 input_stream, content_length, boundary = stream_encode_multipart( 

753 CombinedMultiDict([self.form, self.files]), charset=self.charset 

754 ) 

755 content_type = f'{mimetype}; boundary="{boundary}"' 

756 elif mimetype == "application/x-www-form-urlencoded": 

757 form_encoded = url_encode(self.form, charset=self.charset).encode("ascii") 

758 content_length = len(form_encoded) 

759 input_stream = BytesIO(form_encoded) 

760 else: 

761 input_stream = BytesIO() 

762 

763 result: "WSGIEnvironment" = {} 

764 if self.environ_base: 

765 result.update(self.environ_base) 

766 

767 def _path_encode(x: str) -> str: 

768 return _wsgi_encoding_dance(url_unquote(x, self.charset), self.charset) 

769 

770 raw_uri = _wsgi_encoding_dance(self.request_uri, self.charset) 

771 result.update( 

772 { 

773 "REQUEST_METHOD": self.method, 

774 "SCRIPT_NAME": _path_encode(self.script_root), 

775 "PATH_INFO": _path_encode(self.path), 

776 "QUERY_STRING": _wsgi_encoding_dance(self.query_string, self.charset), 

777 # Non-standard, added by mod_wsgi, uWSGI 

778 "REQUEST_URI": raw_uri, 

779 # Non-standard, added by gunicorn 

780 "RAW_URI": raw_uri, 

781 "SERVER_NAME": self.server_name, 

782 "SERVER_PORT": str(self.server_port), 

783 "HTTP_HOST": self.host, 

784 "SERVER_PROTOCOL": self.server_protocol, 

785 "wsgi.version": self.wsgi_version, 

786 "wsgi.url_scheme": self.url_scheme, 

787 "wsgi.input": input_stream, 

788 "wsgi.errors": self.errors_stream, 

789 "wsgi.multithread": self.multithread, 

790 "wsgi.multiprocess": self.multiprocess, 

791 "wsgi.run_once": self.run_once, 

792 } 

793 ) 

794 

795 headers = self.headers.copy() 

796 # Don't send these as headers, they're part of the environ. 

797 headers.remove("Content-Type") 

798 headers.remove("Content-Length") 

799 

800 if content_type is not None: 

801 result["CONTENT_TYPE"] = content_type 

802 

803 if content_length is not None: 

804 result["CONTENT_LENGTH"] = str(content_length) 

805 

806 combined_headers = defaultdict(list) 

807 

808 for key, value in headers.to_wsgi_list(): 

809 combined_headers[f"HTTP_{key.upper().replace('-', '_')}"].append(value) 

810 

811 for key, values in combined_headers.items(): 

812 result[key] = ", ".join(values) 

813 

814 if self.environ_overrides: 

815 result.update(self.environ_overrides) 

816 

817 return result 

818 

819 def get_request(self, cls: t.Optional[t.Type[Request]] = None) -> Request: 

820 """Returns a request with the data. If the request class is not 

821 specified :attr:`request_class` is used. 

822 

823 :param cls: The request wrapper to use. 

824 """ 

825 if cls is None: 

826 cls = self.request_class 

827 

828 return cls(self.get_environ()) 

829 

830 

831class ClientRedirectError(Exception): 

832 """If a redirect loop is detected when using follow_redirects=True with 

833 the :cls:`Client`, then this exception is raised. 

834 """ 

835 

836 

837class Client: 

838 """This class allows you to send requests to a wrapped application. 

839 

840 The use_cookies parameter indicates whether cookies should be stored and 

841 sent for subsequent requests. This is True by default, but passing False 

842 will disable this behaviour. 

843 

844 If you want to request some subdomain of your application you may set 

845 `allow_subdomain_redirects` to `True` as if not no external redirects 

846 are allowed. 

847 

848 .. versionchanged:: 2.1 

849 Removed deprecated behavior of treating the response as a 

850 tuple. All data is available as properties on the returned 

851 response object. 

852 

853 .. versionchanged:: 2.0 

854 ``response_wrapper`` is always a subclass of 

855 :class:``TestResponse``. 

856 

857 .. versionchanged:: 0.5 

858 Added the ``use_cookies`` parameter. 

859 """ 

860 

861 def __init__( 

862 self, 

863 application: "WSGIApplication", 

864 response_wrapper: t.Optional[t.Type["Response"]] = None, 

865 use_cookies: bool = True, 

866 allow_subdomain_redirects: bool = False, 

867 ) -> None: 

868 self.application = application 

869 

870 if response_wrapper in {None, Response}: 

871 response_wrapper = TestResponse 

872 elif not isinstance(response_wrapper, TestResponse): 

873 response_wrapper = type( 

874 "WrapperTestResponse", 

875 (TestResponse, response_wrapper), # type: ignore 

876 {}, 

877 ) 

878 

879 self.response_wrapper = t.cast(t.Type["TestResponse"], response_wrapper) 

880 

881 if use_cookies: 

882 self.cookie_jar: t.Optional[_TestCookieJar] = _TestCookieJar() 

883 else: 

884 self.cookie_jar = None 

885 

886 self.allow_subdomain_redirects = allow_subdomain_redirects 

887 

888 def set_cookie( 

889 self, 

890 server_name: str, 

891 key: str, 

892 value: str = "", 

893 max_age: t.Optional[t.Union[timedelta, int]] = None, 

894 expires: t.Optional[t.Union[str, datetime, int, float]] = None, 

895 path: str = "/", 

896 domain: t.Optional[str] = None, 

897 secure: bool = False, 

898 httponly: bool = False, 

899 samesite: t.Optional[str] = None, 

900 charset: str = "utf-8", 

901 ) -> None: 

902 """Sets a cookie in the client's cookie jar. The server name 

903 is required and has to match the one that is also passed to 

904 the open call. 

905 """ 

906 assert self.cookie_jar is not None, "cookies disabled" 

907 header = dump_cookie( 

908 key, 

909 value, 

910 max_age, 

911 expires, 

912 path, 

913 domain, 

914 secure, 

915 httponly, 

916 charset, 

917 samesite=samesite, 

918 ) 

919 environ = create_environ(path, base_url=f"http://{server_name}") 

920 headers = [("Set-Cookie", header)] 

921 self.cookie_jar.extract_wsgi(environ, headers) 

922 

923 def delete_cookie( 

924 self, 

925 server_name: str, 

926 key: str, 

927 path: str = "/", 

928 domain: t.Optional[str] = None, 

929 secure: bool = False, 

930 httponly: bool = False, 

931 samesite: t.Optional[str] = None, 

932 ) -> None: 

933 """Deletes a cookie in the test client.""" 

934 self.set_cookie( 

935 server_name, 

936 key, 

937 expires=0, 

938 max_age=0, 

939 path=path, 

940 domain=domain, 

941 secure=secure, 

942 httponly=httponly, 

943 samesite=samesite, 

944 ) 

945 

946 def run_wsgi_app( 

947 self, environ: "WSGIEnvironment", buffered: bool = False 

948 ) -> t.Tuple[t.Iterable[bytes], str, Headers]: 

949 """Runs the wrapped WSGI app with the given environment. 

950 

951 :meta private: 

952 """ 

953 if self.cookie_jar is not None: 

954 self.cookie_jar.inject_wsgi(environ) 

955 

956 rv = run_wsgi_app(self.application, environ, buffered=buffered) 

957 

958 if self.cookie_jar is not None: 

959 self.cookie_jar.extract_wsgi(environ, rv[2]) 

960 

961 return rv 

962 

963 def resolve_redirect( 

964 self, response: "TestResponse", buffered: bool = False 

965 ) -> "TestResponse": 

966 """Perform a new request to the location given by the redirect 

967 response to the previous request. 

968 

969 :meta private: 

970 """ 

971 scheme, netloc, path, qs, anchor = url_parse(response.location) 

972 builder = EnvironBuilder.from_environ( 

973 response.request.environ, path=path, query_string=qs 

974 ) 

975 

976 to_name_parts = netloc.split(":", 1)[0].split(".") 

977 from_name_parts = builder.server_name.split(".") 

978 

979 if to_name_parts != [""]: 

980 # The new location has a host, use it for the base URL. 

981 builder.url_scheme = scheme 

982 builder.host = netloc 

983 else: 

984 # A local redirect with autocorrect_location_header=False 

985 # doesn't have a host, so use the request's host. 

986 to_name_parts = from_name_parts 

987 

988 # Explain why a redirect to a different server name won't be followed. 

989 if to_name_parts != from_name_parts: 

990 if to_name_parts[-len(from_name_parts) :] == from_name_parts: 

991 if not self.allow_subdomain_redirects: 

992 raise RuntimeError("Following subdomain redirects is not enabled.") 

993 else: 

994 raise RuntimeError("Following external redirects is not supported.") 

995 

996 path_parts = path.split("/") 

997 root_parts = builder.script_root.split("/") 

998 

999 if path_parts[: len(root_parts)] == root_parts: 

1000 # Strip the script root from the path. 

1001 builder.path = path[len(builder.script_root) :] 

1002 else: 

1003 # The new location is not under the script root, so use the 

1004 # whole path and clear the previous root. 

1005 builder.path = path 

1006 builder.script_root = "" 

1007 

1008 # Only 307 and 308 preserve all of the original request. 

1009 if response.status_code not in {307, 308}: 

1010 # HEAD is preserved, everything else becomes GET. 

1011 if builder.method != "HEAD": 

1012 builder.method = "GET" 

1013 

1014 # Clear the body and the headers that describe it. 

1015 

1016 if builder.input_stream is not None: 

1017 builder.input_stream.close() 

1018 builder.input_stream = None 

1019 

1020 builder.content_type = None 

1021 builder.content_length = None 

1022 builder.headers.pop("Transfer-Encoding", None) 

1023 

1024 return self.open(builder, buffered=buffered) 

1025 

1026 def open( 

1027 self, 

1028 *args: t.Any, 

1029 buffered: bool = False, 

1030 follow_redirects: bool = False, 

1031 **kwargs: t.Any, 

1032 ) -> "TestResponse": 

1033 """Generate an environ dict from the given arguments, make a 

1034 request to the application using it, and return the response. 

1035 

1036 :param args: Passed to :class:`EnvironBuilder` to create the 

1037 environ for the request. If a single arg is passed, it can 

1038 be an existing :class:`EnvironBuilder` or an environ dict. 

1039 :param buffered: Convert the iterator returned by the app into 

1040 a list. If the iterator has a ``close()`` method, it is 

1041 called automatically. 

1042 :param follow_redirects: Make additional requests to follow HTTP 

1043 redirects until a non-redirect status is returned. 

1044 :attr:`TestResponse.history` lists the intermediate 

1045 responses. 

1046 

1047 .. versionchanged:: 2.1 

1048 Removed the ``as_tuple`` parameter. 

1049 

1050 .. versionchanged:: 2.0 

1051 ``as_tuple`` is deprecated and will be removed in Werkzeug 

1052 2.1. Use :attr:`TestResponse.request` and 

1053 ``request.environ`` instead. 

1054 

1055 .. versionchanged:: 2.0 

1056 The request input stream is closed when calling 

1057 ``response.close()``. Input streams for redirects are 

1058 automatically closed. 

1059 

1060 .. versionchanged:: 0.5 

1061 If a dict is provided as file in the dict for the ``data`` 

1062 parameter the content type has to be called ``content_type`` 

1063 instead of ``mimetype``. This change was made for 

1064 consistency with :class:`werkzeug.FileWrapper`. 

1065 

1066 .. versionchanged:: 0.5 

1067 Added the ``follow_redirects`` parameter. 

1068 """ 

1069 request: t.Optional["Request"] = None 

1070 

1071 if not kwargs and len(args) == 1: 

1072 arg = args[0] 

1073 

1074 if isinstance(arg, EnvironBuilder): 

1075 request = arg.get_request() 

1076 elif isinstance(arg, dict): 

1077 request = EnvironBuilder.from_environ(arg).get_request() 

1078 elif isinstance(arg, Request): 

1079 request = arg 

1080 

1081 if request is None: 

1082 builder = EnvironBuilder(*args, **kwargs) 

1083 

1084 try: 

1085 request = builder.get_request() 

1086 finally: 

1087 builder.close() 

1088 

1089 response = self.run_wsgi_app(request.environ, buffered=buffered) 

1090 response = self.response_wrapper(*response, request=request) 

1091 

1092 redirects = set() 

1093 history: t.List["TestResponse"] = [] 

1094 

1095 if not follow_redirects: 

1096 return response 

1097 

1098 while response.status_code in { 

1099 301, 

1100 302, 

1101 303, 

1102 305, 

1103 307, 

1104 308, 

1105 }: 

1106 # Exhaust intermediate response bodies to ensure middleware 

1107 # that returns an iterator runs any cleanup code. 

1108 if not buffered: 

1109 response.make_sequence() 

1110 response.close() 

1111 

1112 new_redirect_entry = (response.location, response.status_code) 

1113 

1114 if new_redirect_entry in redirects: 

1115 raise ClientRedirectError( 

1116 f"Loop detected: A {response.status_code} redirect" 

1117 f" to {response.location} was already made." 

1118 ) 

1119 

1120 redirects.add(new_redirect_entry) 

1121 response.history = tuple(history) 

1122 history.append(response) 

1123 response = self.resolve_redirect(response, buffered=buffered) 

1124 else: 

1125 # This is the final request after redirects. 

1126 response.history = tuple(history) 

1127 # Close the input stream when closing the response, in case 

1128 # the input is an open temporary file. 

1129 response.call_on_close(request.input_stream.close) 

1130 return response 

1131 

1132 def get(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1133 """Call :meth:`open` with ``method`` set to ``GET``.""" 

1134 kw["method"] = "GET" 

1135 return self.open(*args, **kw) 

1136 

1137 def post(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1138 """Call :meth:`open` with ``method`` set to ``POST``.""" 

1139 kw["method"] = "POST" 

1140 return self.open(*args, **kw) 

1141 

1142 def put(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1143 """Call :meth:`open` with ``method`` set to ``PUT``.""" 

1144 kw["method"] = "PUT" 

1145 return self.open(*args, **kw) 

1146 

1147 def delete(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1148 """Call :meth:`open` with ``method`` set to ``DELETE``.""" 

1149 kw["method"] = "DELETE" 

1150 return self.open(*args, **kw) 

1151 

1152 def patch(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1153 """Call :meth:`open` with ``method`` set to ``PATCH``.""" 

1154 kw["method"] = "PATCH" 

1155 return self.open(*args, **kw) 

1156 

1157 def options(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1158 """Call :meth:`open` with ``method`` set to ``OPTIONS``.""" 

1159 kw["method"] = "OPTIONS" 

1160 return self.open(*args, **kw) 

1161 

1162 def head(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1163 """Call :meth:`open` with ``method`` set to ``HEAD``.""" 

1164 kw["method"] = "HEAD" 

1165 return self.open(*args, **kw) 

1166 

1167 def trace(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1168 """Call :meth:`open` with ``method`` set to ``TRACE``.""" 

1169 kw["method"] = "TRACE" 

1170 return self.open(*args, **kw) 

1171 

1172 def __repr__(self) -> str: 

1173 return f"<{type(self).__name__} {self.application!r}>" 

1174 

1175 

1176def create_environ(*args: t.Any, **kwargs: t.Any) -> "WSGIEnvironment": 

1177 """Create a new WSGI environ dict based on the values passed. The first 

1178 parameter should be the path of the request which defaults to '/'. The 

1179 second one can either be an absolute path (in that case the host is 

1180 localhost:80) or a full path to the request with scheme, netloc port and 

1181 the path to the script. 

1182 

1183 This accepts the same arguments as the :class:`EnvironBuilder` 

1184 constructor. 

1185 

1186 .. versionchanged:: 0.5 

1187 This function is now a thin wrapper over :class:`EnvironBuilder` which 

1188 was added in 0.5. The `headers`, `environ_base`, `environ_overrides` 

1189 and `charset` parameters were added. 

1190 """ 

1191 builder = EnvironBuilder(*args, **kwargs) 

1192 

1193 try: 

1194 return builder.get_environ() 

1195 finally: 

1196 builder.close() 

1197 

1198 

1199def run_wsgi_app( 

1200 app: "WSGIApplication", environ: "WSGIEnvironment", buffered: bool = False 

1201) -> t.Tuple[t.Iterable[bytes], str, Headers]: 

1202 """Return a tuple in the form (app_iter, status, headers) of the 

1203 application output. This works best if you pass it an application that 

1204 returns an iterator all the time. 

1205 

1206 Sometimes applications may use the `write()` callable returned 

1207 by the `start_response` function. This tries to resolve such edge 

1208 cases automatically. But if you don't get the expected output you 

1209 should set `buffered` to `True` which enforces buffering. 

1210 

1211 If passed an invalid WSGI application the behavior of this function is 

1212 undefined. Never pass non-conforming WSGI applications to this function. 

1213 

1214 :param app: the application to execute. 

1215 :param buffered: set to `True` to enforce buffering. 

1216 :return: tuple in the form ``(app_iter, status, headers)`` 

1217 """ 

1218 # Copy environ to ensure any mutations by the app (ProxyFix, for 

1219 # example) don't affect subsequent requests (such as redirects). 

1220 environ = _get_environ(environ).copy() 

1221 status: str 

1222 response: t.Optional[t.Tuple[str, t.List[t.Tuple[str, str]]]] = None 

1223 buffer: t.List[bytes] = [] 

1224 

1225 def start_response(status, headers, exc_info=None): # type: ignore 

1226 nonlocal response 

1227 

1228 if exc_info: 

1229 try: 

1230 raise exc_info[1].with_traceback(exc_info[2]) 

1231 finally: 

1232 exc_info = None 

1233 

1234 response = (status, headers) 

1235 return buffer.append 

1236 

1237 app_rv = app(environ, start_response) 

1238 close_func = getattr(app_rv, "close", None) 

1239 app_iter: t.Iterable[bytes] = iter(app_rv) 

1240 

1241 # when buffering we emit the close call early and convert the 

1242 # application iterator into a regular list 

1243 if buffered: 

1244 try: 

1245 app_iter = list(app_iter) 

1246 finally: 

1247 if close_func is not None: 

1248 close_func() 

1249 

1250 # otherwise we iterate the application iter until we have a response, chain 

1251 # the already received data with the already collected data and wrap it in 

1252 # a new `ClosingIterator` if we need to restore a `close` callable from the 

1253 # original return value. 

1254 else: 

1255 for item in app_iter: 

1256 buffer.append(item) 

1257 

1258 if response is not None: 

1259 break 

1260 

1261 if buffer: 

1262 app_iter = chain(buffer, app_iter) 

1263 

1264 if close_func is not None and app_iter is not app_rv: 

1265 app_iter = ClosingIterator(app_iter, close_func) 

1266 

1267 status, headers = response # type: ignore 

1268 return app_iter, status, Headers(headers) 

1269 

1270 

1271class TestResponse(Response): 

1272 """:class:`~werkzeug.wrappers.Response` subclass that provides extra 

1273 information about requests made with the test :class:`Client`. 

1274 

1275 Test client requests will always return an instance of this class. 

1276 If a custom response class is passed to the client, it is 

1277 subclassed along with this to support test information. 

1278 

1279 If the test request included large files, or if the application is 

1280 serving a file, call :meth:`close` to close any open files and 

1281 prevent Python showing a ``ResourceWarning``. 

1282 

1283 .. versionchanged:: 2.1 

1284 Removed deprecated behavior for treating the response instance 

1285 as a tuple. 

1286 

1287 .. versionadded:: 2.0 

1288 Test client methods always return instances of this class. 

1289 """ 

1290 

1291 request: Request 

1292 """A request object with the environ used to make the request that 

1293 resulted in this response. 

1294 """ 

1295 

1296 history: t.Tuple["TestResponse", ...] 

1297 """A list of intermediate responses. Populated when the test request 

1298 is made with ``follow_redirects`` enabled. 

1299 """ 

1300 

1301 # Tell Pytest to ignore this, it's not a test class. 

1302 __test__ = False 

1303 

1304 def __init__( 

1305 self, 

1306 response: t.Iterable[bytes], 

1307 status: str, 

1308 headers: Headers, 

1309 request: Request, 

1310 history: t.Tuple["TestResponse"] = (), # type: ignore 

1311 **kwargs: t.Any, 

1312 ) -> None: 

1313 super().__init__(response, status, headers, **kwargs) 

1314 self.request = request 

1315 self.history = history 

1316 self._compat_tuple = response, status, headers 

1317 

1318 @cached_property 

1319 def text(self) -> str: 

1320 """The response data as text. A shortcut for 

1321 ``response.get_data(as_text=True)``. 

1322 

1323 .. versionadded:: 2.1 

1324 """ 

1325 return self.get_data(as_text=True)