Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/test.py: 27%

573 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1import mimetypes 

2import sys 

3import typing as t 

4from collections import defaultdict 

5from datetime import datetime 

6from datetime import timedelta 

7from http.cookiejar import CookieJar 

8from io import BytesIO 

9from itertools import chain 

10from random import random 

11from tempfile import TemporaryFile 

12from time import time 

13from urllib.request import Request as _UrllibRequest 

14 

15from ._internal import _get_environ 

16from ._internal import _make_encode_wrapper 

17from ._internal import _wsgi_decoding_dance 

18from ._internal import _wsgi_encoding_dance 

19from .datastructures import Authorization 

20from .datastructures import CallbackDict 

21from .datastructures import CombinedMultiDict 

22from .datastructures import EnvironHeaders 

23from .datastructures import FileMultiDict 

24from .datastructures import Headers 

25from .datastructures import MultiDict 

26from .http import dump_cookie 

27from .http import dump_options_header 

28from .http import parse_options_header 

29from .sansio.multipart import Data 

30from .sansio.multipart import Epilogue 

31from .sansio.multipart import Field 

32from .sansio.multipart import File 

33from .sansio.multipart import MultipartEncoder 

34from .sansio.multipart import Preamble 

35from .urls import iri_to_uri 

36from .urls import url_encode 

37from .urls import url_fix 

38from .urls import url_parse 

39from .urls import url_unparse 

40from .urls import url_unquote 

41from .utils import cached_property 

42from .utils import get_content_type 

43from .wrappers.request import Request 

44from .wrappers.response import Response 

45from .wsgi import ClosingIterator 

46from .wsgi import get_current_url 

47 

48if t.TYPE_CHECKING: 

49 from _typeshed.wsgi import WSGIApplication 

50 from _typeshed.wsgi import WSGIEnvironment 

51 

52 

53def stream_encode_multipart( 

54 data: t.Mapping[str, t.Any], 

55 use_tempfile: bool = True, 

56 threshold: int = 1024 * 500, 

57 boundary: t.Optional[str] = None, 

58 charset: str = "utf-8", 

59) -> t.Tuple[t.IO[bytes], int, str]: 

60 """Encode a dict of values (either strings or file descriptors or 

61 :class:`FileStorage` objects.) into a multipart encoded string stored 

62 in a file descriptor. 

63 """ 

64 if boundary is None: 

65 boundary = f"---------------WerkzeugFormPart_{time()}{random()}" 

66 

67 stream: t.IO[bytes] = BytesIO() 

68 total_length = 0 

69 on_disk = False 

70 write_binary: t.Callable[[bytes], int] 

71 

72 if use_tempfile: 

73 

74 def write_binary(s: bytes) -> int: 

75 nonlocal stream, total_length, on_disk 

76 

77 if on_disk: 

78 return stream.write(s) 

79 else: 

80 length = len(s) 

81 

82 if length + total_length <= threshold: 

83 stream.write(s) 

84 else: 

85 new_stream = t.cast(t.IO[bytes], TemporaryFile("wb+")) 

86 new_stream.write(stream.getvalue()) # type: ignore 

87 new_stream.write(s) 

88 stream = new_stream 

89 on_disk = True 

90 

91 total_length += length 

92 return length 

93 

94 else: 

95 write_binary = stream.write 

96 

97 encoder = MultipartEncoder(boundary.encode()) 

98 write_binary(encoder.send_event(Preamble(data=b""))) 

99 for key, value in _iter_data(data): 

100 reader = getattr(value, "read", None) 

101 if reader is not None: 

102 filename = getattr(value, "filename", getattr(value, "name", None)) 

103 content_type = getattr(value, "content_type", None) 

104 if content_type is None: 

105 content_type = ( 

106 filename 

107 and mimetypes.guess_type(filename)[0] 

108 or "application/octet-stream" 

109 ) 

110 headers = value.headers 

111 headers.update([("Content-Type", content_type)]) 

112 if filename is None: 

113 write_binary(encoder.send_event(Field(name=key, headers=headers))) 

114 else: 

115 write_binary( 

116 encoder.send_event( 

117 File(name=key, filename=filename, headers=headers) 

118 ) 

119 ) 

120 while True: 

121 chunk = reader(16384) 

122 

123 if not chunk: 

124 break 

125 

126 write_binary(encoder.send_event(Data(data=chunk, more_data=True))) 

127 else: 

128 if not isinstance(value, str): 

129 value = str(value) 

130 write_binary(encoder.send_event(Field(name=key, headers=Headers()))) 

131 write_binary( 

132 encoder.send_event(Data(data=value.encode(charset), more_data=False)) 

133 ) 

134 

135 write_binary(encoder.send_event(Epilogue(data=b""))) 

136 

137 length = stream.tell() 

138 stream.seek(0) 

139 return stream, length, boundary 

140 

141 

142def encode_multipart( 

143 values: t.Mapping[str, t.Any], 

144 boundary: t.Optional[str] = None, 

145 charset: str = "utf-8", 

146) -> t.Tuple[str, bytes]: 

147 """Like `stream_encode_multipart` but returns a tuple in the form 

148 (``boundary``, ``data``) where data is bytes. 

149 """ 

150 stream, length, boundary = stream_encode_multipart( 

151 values, use_tempfile=False, boundary=boundary, charset=charset 

152 ) 

153 return boundary, stream.read() 

154 

155 

156class _TestCookieHeaders: 

157 """A headers adapter for cookielib""" 

158 

159 def __init__(self, headers: t.Union[Headers, t.List[t.Tuple[str, str]]]) -> None: 

160 self.headers = headers 

161 

162 def getheaders(self, name: str) -> t.Iterable[str]: 

163 headers = [] 

164 name = name.lower() 

165 for k, v in self.headers: 

166 if k.lower() == name: 

167 headers.append(v) 

168 return headers 

169 

170 def get_all( 

171 self, name: str, default: t.Optional[t.Iterable[str]] = None 

172 ) -> t.Iterable[str]: 

173 headers = self.getheaders(name) 

174 

175 if not headers: 

176 return default # type: ignore 

177 

178 return headers 

179 

180 

181class _TestCookieResponse: 

182 """Something that looks like a httplib.HTTPResponse, but is actually just an 

183 adapter for our test responses to make them available for cookielib. 

184 """ 

185 

186 def __init__(self, headers: t.Union[Headers, t.List[t.Tuple[str, str]]]) -> None: 

187 self.headers = _TestCookieHeaders(headers) 

188 

189 def info(self) -> _TestCookieHeaders: 

190 return self.headers 

191 

192 

193class _TestCookieJar(CookieJar): 

194 """A cookielib.CookieJar modified to inject and read cookie headers from 

195 and to wsgi environments, and wsgi application responses. 

196 """ 

197 

198 def inject_wsgi(self, environ: "WSGIEnvironment") -> None: 

199 """Inject the cookies as client headers into the server's wsgi 

200 environment. 

201 """ 

202 cvals = [f"{c.name}={c.value}" for c in self] 

203 

204 if cvals: 

205 environ["HTTP_COOKIE"] = "; ".join(cvals) 

206 else: 

207 environ.pop("HTTP_COOKIE", None) 

208 

209 def extract_wsgi( 

210 self, 

211 environ: "WSGIEnvironment", 

212 headers: t.Union[Headers, t.List[t.Tuple[str, str]]], 

213 ) -> None: 

214 """Extract the server's set-cookie headers as cookies into the 

215 cookie jar. 

216 """ 

217 self.extract_cookies( 

218 _TestCookieResponse(headers), # type: ignore 

219 _UrllibRequest(get_current_url(environ)), 

220 ) 

221 

222 

223def _iter_data(data: t.Mapping[str, t.Any]) -> t.Iterator[t.Tuple[str, t.Any]]: 

224 """Iterate over a mapping that might have a list of values, yielding 

225 all key, value pairs. Almost like iter_multi_items but only allows 

226 lists, not tuples, of values so tuples can be used for files. 

227 """ 

228 if isinstance(data, MultiDict): 

229 yield from data.items(multi=True) 

230 else: 

231 for key, value in data.items(): 

232 if isinstance(value, list): 

233 for v in value: 

234 yield key, v 

235 else: 

236 yield key, value 

237 

238 

239_TAnyMultiDict = t.TypeVar("_TAnyMultiDict", bound=MultiDict) 

240 

241 

242class EnvironBuilder: 

243 """This class can be used to conveniently create a WSGI environment 

244 for testing purposes. It can be used to quickly create WSGI environments 

245 or request objects from arbitrary data. 

246 

247 The signature of this class is also used in some other places as of 

248 Werkzeug 0.5 (:func:`create_environ`, :meth:`Response.from_values`, 

249 :meth:`Client.open`). Because of this most of the functionality is 

250 available through the constructor alone. 

251 

252 Files and regular form data can be manipulated independently of each 

253 other with the :attr:`form` and :attr:`files` attributes, but are 

254 passed with the same argument to the constructor: `data`. 

255 

256 `data` can be any of these values: 

257 

258 - a `str` or `bytes` object: The object is converted into an 

259 :attr:`input_stream`, the :attr:`content_length` is set and you have to 

260 provide a :attr:`content_type`. 

261 - a `dict` or :class:`MultiDict`: The keys have to be strings. The values 

262 have to be either any of the following objects, or a list of any of the 

263 following objects: 

264 

265 - a :class:`file`-like object: These are converted into 

266 :class:`FileStorage` objects automatically. 

267 - a `tuple`: The :meth:`~FileMultiDict.add_file` method is called 

268 with the key and the unpacked `tuple` items as positional 

269 arguments. 

270 - a `str`: The string is set as form data for the associated key. 

271 - a file-like object: The object content is loaded in memory and then 

272 handled like a regular `str` or a `bytes`. 

273 

274 :param path: the path of the request. In the WSGI environment this will 

275 end up as `PATH_INFO`. If the `query_string` is not defined 

276 and there is a question mark in the `path` everything after 

277 it is used as query string. 

278 :param base_url: the base URL is a URL that is used to extract the WSGI 

279 URL scheme, host (server name + server port) and the 

280 script root (`SCRIPT_NAME`). 

281 :param query_string: an optional string or dict with URL parameters. 

282 :param method: the HTTP method to use, defaults to `GET`. 

283 :param input_stream: an optional input stream. Do not specify this and 

284 `data`. As soon as an input stream is set you can't 

285 modify :attr:`args` and :attr:`files` unless you 

286 set the :attr:`input_stream` to `None` again. 

287 :param content_type: The content type for the request. As of 0.5 you 

288 don't have to provide this when specifying files 

289 and form data via `data`. 

290 :param content_length: The content length for the request. You don't 

291 have to specify this when providing data via 

292 `data`. 

293 :param errors_stream: an optional error stream that is used for 

294 `wsgi.errors`. Defaults to :data:`stderr`. 

295 :param multithread: controls `wsgi.multithread`. Defaults to `False`. 

296 :param multiprocess: controls `wsgi.multiprocess`. Defaults to `False`. 

297 :param run_once: controls `wsgi.run_once`. Defaults to `False`. 

298 :param headers: an optional list or :class:`Headers` object of headers. 

299 :param data: a string or dict of form data or a file-object. 

300 See explanation above. 

301 :param json: An object to be serialized and assigned to ``data``. 

302 Defaults the content type to ``"application/json"``. 

303 Serialized with the function assigned to :attr:`json_dumps`. 

304 :param environ_base: an optional dict of environment defaults. 

305 :param environ_overrides: an optional dict of environment overrides. 

306 :param charset: the charset used to encode string data. 

307 :param auth: An authorization object to use for the 

308 ``Authorization`` header value. A ``(username, password)`` tuple 

309 is a shortcut for ``Basic`` authorization. 

310 

311 .. versionchanged:: 2.1 

312 ``CONTENT_TYPE`` and ``CONTENT_LENGTH`` are not duplicated as 

313 header keys in the environ. 

314 

315 .. versionchanged:: 2.0 

316 ``REQUEST_URI`` and ``RAW_URI`` is the full raw URI including 

317 the query string, not only the path. 

318 

319 .. versionchanged:: 2.0 

320 The default :attr:`request_class` is ``Request`` instead of 

321 ``BaseRequest``. 

322 

323 .. versionadded:: 2.0 

324 Added the ``auth`` parameter. 

325 

326 .. versionadded:: 0.15 

327 The ``json`` param and :meth:`json_dumps` method. 

328 

329 .. versionadded:: 0.15 

330 The environ has keys ``REQUEST_URI`` and ``RAW_URI`` containing 

331 the path before percent-decoding. This is not part of the WSGI 

332 PEP, but many WSGI servers include it. 

333 

334 .. versionchanged:: 0.6 

335 ``path`` and ``base_url`` can now be unicode strings that are 

336 encoded with :func:`iri_to_uri`. 

337 """ 

338 

339 #: the server protocol to use. defaults to HTTP/1.1 

340 server_protocol = "HTTP/1.1" 

341 

342 #: the wsgi version to use. defaults to (1, 0) 

343 wsgi_version = (1, 0) 

344 

345 #: The default request class used by :meth:`get_request`. 

346 request_class = Request 

347 

348 import json 

349 

350 #: The serialization function used when ``json`` is passed. 

351 json_dumps = staticmethod(json.dumps) 

352 del json 

353 

354 _args: t.Optional[MultiDict] 

355 _query_string: t.Optional[str] 

356 _input_stream: t.Optional[t.IO[bytes]] 

357 _form: t.Optional[MultiDict] 

358 _files: t.Optional[FileMultiDict] 

359 

360 def __init__( 

361 self, 

362 path: str = "/", 

363 base_url: t.Optional[str] = None, 

364 query_string: t.Optional[t.Union[t.Mapping[str, str], str]] = None, 

365 method: str = "GET", 

366 input_stream: t.Optional[t.IO[bytes]] = None, 

367 content_type: t.Optional[str] = None, 

368 content_length: t.Optional[int] = None, 

369 errors_stream: t.Optional[t.IO[str]] = None, 

370 multithread: bool = False, 

371 multiprocess: bool = False, 

372 run_once: bool = False, 

373 headers: t.Optional[t.Union[Headers, t.Iterable[t.Tuple[str, str]]]] = None, 

374 data: t.Optional[ 

375 t.Union[t.IO[bytes], str, bytes, t.Mapping[str, t.Any]] 

376 ] = None, 

377 environ_base: t.Optional[t.Mapping[str, t.Any]] = None, 

378 environ_overrides: t.Optional[t.Mapping[str, t.Any]] = None, 

379 charset: str = "utf-8", 

380 mimetype: t.Optional[str] = None, 

381 json: t.Optional[t.Mapping[str, t.Any]] = None, 

382 auth: t.Optional[t.Union[Authorization, t.Tuple[str, str]]] = None, 

383 ) -> None: 

384 path_s = _make_encode_wrapper(path) 

385 if query_string is not None and path_s("?") in path: 

386 raise ValueError("Query string is defined in the path and as an argument") 

387 request_uri = url_parse(path) 

388 if query_string is None and path_s("?") in path: 

389 query_string = request_uri.query 

390 self.charset = charset 

391 self.path = iri_to_uri(request_uri.path) 

392 self.request_uri = path 

393 if base_url is not None: 

394 base_url = url_fix(iri_to_uri(base_url, charset), charset) 

395 self.base_url = base_url # type: ignore 

396 if isinstance(query_string, (bytes, str)): 

397 self.query_string = query_string 

398 else: 

399 if query_string is None: 

400 query_string = MultiDict() 

401 elif not isinstance(query_string, MultiDict): 

402 query_string = MultiDict(query_string) 

403 self.args = query_string 

404 self.method = method 

405 if headers is None: 

406 headers = Headers() 

407 elif not isinstance(headers, Headers): 

408 headers = Headers(headers) 

409 self.headers = headers 

410 if content_type is not None: 

411 self.content_type = content_type 

412 if errors_stream is None: 

413 errors_stream = sys.stderr 

414 self.errors_stream = errors_stream 

415 self.multithread = multithread 

416 self.multiprocess = multiprocess 

417 self.run_once = run_once 

418 self.environ_base = environ_base 

419 self.environ_overrides = environ_overrides 

420 self.input_stream = input_stream 

421 self.content_length = content_length 

422 self.closed = False 

423 

424 if auth is not None: 

425 if isinstance(auth, tuple): 

426 auth = Authorization( 

427 "basic", {"username": auth[0], "password": auth[1]} 

428 ) 

429 

430 self.headers.set("Authorization", auth.to_header()) 

431 

432 if json is not None: 

433 if data is not None: 

434 raise TypeError("can't provide both json and data") 

435 

436 data = self.json_dumps(json) 

437 

438 if self.content_type is None: 

439 self.content_type = "application/json" 

440 

441 if data: 

442 if input_stream is not None: 

443 raise TypeError("can't provide input stream and data") 

444 if hasattr(data, "read"): 

445 data = data.read() 

446 if isinstance(data, str): 

447 data = data.encode(self.charset) 

448 if isinstance(data, bytes): 

449 self.input_stream = BytesIO(data) 

450 if self.content_length is None: 

451 self.content_length = len(data) 

452 else: 

453 for key, value in _iter_data(data): 

454 if isinstance(value, (tuple, dict)) or hasattr(value, "read"): 

455 self._add_file_from_data(key, value) 

456 else: 

457 self.form.setlistdefault(key).append(value) 

458 

459 if mimetype is not None: 

460 self.mimetype = mimetype 

461 

462 @classmethod 

463 def from_environ( 

464 cls, environ: "WSGIEnvironment", **kwargs: t.Any 

465 ) -> "EnvironBuilder": 

466 """Turn an environ dict back into a builder. Any extra kwargs 

467 override the args extracted from the environ. 

468 

469 .. versionchanged:: 2.0 

470 Path and query values are passed through the WSGI decoding 

471 dance to avoid double encoding. 

472 

473 .. versionadded:: 0.15 

474 """ 

475 headers = Headers(EnvironHeaders(environ)) 

476 out = { 

477 "path": _wsgi_decoding_dance(environ["PATH_INFO"]), 

478 "base_url": cls._make_base_url( 

479 environ["wsgi.url_scheme"], 

480 headers.pop("Host"), 

481 _wsgi_decoding_dance(environ["SCRIPT_NAME"]), 

482 ), 

483 "query_string": _wsgi_decoding_dance(environ["QUERY_STRING"]), 

484 "method": environ["REQUEST_METHOD"], 

485 "input_stream": environ["wsgi.input"], 

486 "content_type": headers.pop("Content-Type", None), 

487 "content_length": headers.pop("Content-Length", None), 

488 "errors_stream": environ["wsgi.errors"], 

489 "multithread": environ["wsgi.multithread"], 

490 "multiprocess": environ["wsgi.multiprocess"], 

491 "run_once": environ["wsgi.run_once"], 

492 "headers": headers, 

493 } 

494 out.update(kwargs) 

495 return cls(**out) 

496 

497 def _add_file_from_data( 

498 self, 

499 key: str, 

500 value: t.Union[ 

501 t.IO[bytes], t.Tuple[t.IO[bytes], str], t.Tuple[t.IO[bytes], str, str] 

502 ], 

503 ) -> None: 

504 """Called in the EnvironBuilder to add files from the data dict.""" 

505 if isinstance(value, tuple): 

506 self.files.add_file(key, *value) 

507 else: 

508 self.files.add_file(key, value) 

509 

510 @staticmethod 

511 def _make_base_url(scheme: str, host: str, script_root: str) -> str: 

512 return url_unparse((scheme, host, script_root, "", "")).rstrip("/") + "/" 

513 

514 @property 

515 def base_url(self) -> str: 

516 """The base URL is used to extract the URL scheme, host name, 

517 port, and root path. 

518 """ 

519 return self._make_base_url(self.url_scheme, self.host, self.script_root) 

520 

521 @base_url.setter 

522 def base_url(self, value: t.Optional[str]) -> None: 

523 if value is None: 

524 scheme = "http" 

525 netloc = "localhost" 

526 script_root = "" 

527 else: 

528 scheme, netloc, script_root, qs, anchor = url_parse(value) 

529 if qs or anchor: 

530 raise ValueError("base url must not contain a query string or fragment") 

531 self.script_root = script_root.rstrip("/") 

532 self.host = netloc 

533 self.url_scheme = scheme 

534 

535 @property 

536 def content_type(self) -> t.Optional[str]: 

537 """The content type for the request. Reflected from and to 

538 the :attr:`headers`. Do not set if you set :attr:`files` or 

539 :attr:`form` for auto detection. 

540 """ 

541 ct = self.headers.get("Content-Type") 

542 if ct is None and not self._input_stream: 

543 if self._files: 

544 return "multipart/form-data" 

545 if self._form: 

546 return "application/x-www-form-urlencoded" 

547 return None 

548 return ct 

549 

550 @content_type.setter 

551 def content_type(self, value: t.Optional[str]) -> None: 

552 if value is None: 

553 self.headers.pop("Content-Type", None) 

554 else: 

555 self.headers["Content-Type"] = value 

556 

557 @property 

558 def mimetype(self) -> t.Optional[str]: 

559 """The mimetype (content type without charset etc.) 

560 

561 .. versionadded:: 0.14 

562 """ 

563 ct = self.content_type 

564 return ct.split(";")[0].strip() if ct else None 

565 

566 @mimetype.setter 

567 def mimetype(self, value: str) -> None: 

568 self.content_type = get_content_type(value, self.charset) 

569 

570 @property 

571 def mimetype_params(self) -> t.Mapping[str, str]: 

572 """The mimetype parameters as dict. For example if the 

573 content type is ``text/html; charset=utf-8`` the params would be 

574 ``{'charset': 'utf-8'}``. 

575 

576 .. versionadded:: 0.14 

577 """ 

578 

579 def on_update(d: CallbackDict) -> None: 

580 self.headers["Content-Type"] = dump_options_header(self.mimetype, d) 

581 

582 d = parse_options_header(self.headers.get("content-type", ""))[1] 

583 return CallbackDict(d, on_update) 

584 

585 @property 

586 def content_length(self) -> t.Optional[int]: 

587 """The content length as integer. Reflected from and to the 

588 :attr:`headers`. Do not set if you set :attr:`files` or 

589 :attr:`form` for auto detection. 

590 """ 

591 return self.headers.get("Content-Length", type=int) 

592 

593 @content_length.setter 

594 def content_length(self, value: t.Optional[int]) -> None: 

595 if value is None: 

596 self.headers.pop("Content-Length", None) 

597 else: 

598 self.headers["Content-Length"] = str(value) 

599 

600 def _get_form(self, name: str, storage: t.Type[_TAnyMultiDict]) -> _TAnyMultiDict: 

601 """Common behavior for getting the :attr:`form` and 

602 :attr:`files` properties. 

603 

604 :param name: Name of the internal cached attribute. 

605 :param storage: Storage class used for the data. 

606 """ 

607 if self.input_stream is not None: 

608 raise AttributeError("an input stream is defined") 

609 

610 rv = getattr(self, name) 

611 

612 if rv is None: 

613 rv = storage() 

614 setattr(self, name, rv) 

615 

616 return rv # type: ignore 

617 

618 def _set_form(self, name: str, value: MultiDict) -> None: 

619 """Common behavior for setting the :attr:`form` and 

620 :attr:`files` properties. 

621 

622 :param name: Name of the internal cached attribute. 

623 :param value: Value to assign to the attribute. 

624 """ 

625 self._input_stream = None 

626 setattr(self, name, value) 

627 

628 @property 

629 def form(self) -> MultiDict: 

630 """A :class:`MultiDict` of form values.""" 

631 return self._get_form("_form", MultiDict) 

632 

633 @form.setter 

634 def form(self, value: MultiDict) -> None: 

635 self._set_form("_form", value) 

636 

637 @property 

638 def files(self) -> FileMultiDict: 

639 """A :class:`FileMultiDict` of uploaded files. Use 

640 :meth:`~FileMultiDict.add_file` to add new files. 

641 """ 

642 return self._get_form("_files", FileMultiDict) 

643 

644 @files.setter 

645 def files(self, value: FileMultiDict) -> None: 

646 self._set_form("_files", value) 

647 

648 @property 

649 def input_stream(self) -> t.Optional[t.IO[bytes]]: 

650 """An optional input stream. This is mutually exclusive with 

651 setting :attr:`form` and :attr:`files`, setting it will clear 

652 those. Do not provide this if the method is not ``POST`` or 

653 another method that has a body. 

654 """ 

655 return self._input_stream 

656 

657 @input_stream.setter 

658 def input_stream(self, value: t.Optional[t.IO[bytes]]) -> None: 

659 self._input_stream = value 

660 self._form = None 

661 self._files = None 

662 

663 @property 

664 def query_string(self) -> str: 

665 """The query string. If you set this to a string 

666 :attr:`args` will no longer be available. 

667 """ 

668 if self._query_string is None: 

669 if self._args is not None: 

670 return url_encode(self._args, charset=self.charset) 

671 return "" 

672 return self._query_string 

673 

674 @query_string.setter 

675 def query_string(self, value: t.Optional[str]) -> None: 

676 self._query_string = value 

677 self._args = None 

678 

679 @property 

680 def args(self) -> MultiDict: 

681 """The URL arguments as :class:`MultiDict`.""" 

682 if self._query_string is not None: 

683 raise AttributeError("a query string is defined") 

684 if self._args is None: 

685 self._args = MultiDict() 

686 return self._args 

687 

688 @args.setter 

689 def args(self, value: t.Optional[MultiDict]) -> None: 

690 self._query_string = None 

691 self._args = value 

692 

693 @property 

694 def server_name(self) -> str: 

695 """The server name (read-only, use :attr:`host` to set)""" 

696 return self.host.split(":", 1)[0] 

697 

698 @property 

699 def server_port(self) -> int: 

700 """The server port as integer (read-only, use :attr:`host` to set)""" 

701 pieces = self.host.split(":", 1) 

702 

703 if len(pieces) == 2: 

704 try: 

705 return int(pieces[1]) 

706 except ValueError: 

707 pass 

708 

709 if self.url_scheme == "https": 

710 return 443 

711 return 80 

712 

713 def __del__(self) -> None: 

714 try: 

715 self.close() 

716 except Exception: 

717 pass 

718 

719 def close(self) -> None: 

720 """Closes all files. If you put real :class:`file` objects into the 

721 :attr:`files` dict you can call this method to automatically close 

722 them all in one go. 

723 """ 

724 if self.closed: 

725 return 

726 try: 

727 files = self.files.values() 

728 except AttributeError: 

729 files = () # type: ignore 

730 for f in files: 

731 try: 

732 f.close() 

733 except Exception: 

734 pass 

735 self.closed = True 

736 

737 def get_environ(self) -> "WSGIEnvironment": 

738 """Return the built environ. 

739 

740 .. versionchanged:: 0.15 

741 The content type and length headers are set based on 

742 input stream detection. Previously this only set the WSGI 

743 keys. 

744 """ 

745 input_stream = self.input_stream 

746 content_length = self.content_length 

747 

748 mimetype = self.mimetype 

749 content_type = self.content_type 

750 

751 if input_stream is not None: 

752 start_pos = input_stream.tell() 

753 input_stream.seek(0, 2) 

754 end_pos = input_stream.tell() 

755 input_stream.seek(start_pos) 

756 content_length = end_pos - start_pos 

757 elif mimetype == "multipart/form-data": 

758 input_stream, content_length, boundary = stream_encode_multipart( 

759 CombinedMultiDict([self.form, self.files]), charset=self.charset 

760 ) 

761 content_type = f'{mimetype}; boundary="{boundary}"' 

762 elif mimetype == "application/x-www-form-urlencoded": 

763 form_encoded = url_encode(self.form, charset=self.charset).encode("ascii") 

764 content_length = len(form_encoded) 

765 input_stream = BytesIO(form_encoded) 

766 else: 

767 input_stream = BytesIO() 

768 

769 result: "WSGIEnvironment" = {} 

770 if self.environ_base: 

771 result.update(self.environ_base) 

772 

773 def _path_encode(x: str) -> str: 

774 return _wsgi_encoding_dance(url_unquote(x, self.charset), self.charset) 

775 

776 raw_uri = _wsgi_encoding_dance(self.request_uri, self.charset) 

777 result.update( 

778 { 

779 "REQUEST_METHOD": self.method, 

780 "SCRIPT_NAME": _path_encode(self.script_root), 

781 "PATH_INFO": _path_encode(self.path), 

782 "QUERY_STRING": _wsgi_encoding_dance(self.query_string, self.charset), 

783 # Non-standard, added by mod_wsgi, uWSGI 

784 "REQUEST_URI": raw_uri, 

785 # Non-standard, added by gunicorn 

786 "RAW_URI": raw_uri, 

787 "SERVER_NAME": self.server_name, 

788 "SERVER_PORT": str(self.server_port), 

789 "HTTP_HOST": self.host, 

790 "SERVER_PROTOCOL": self.server_protocol, 

791 "wsgi.version": self.wsgi_version, 

792 "wsgi.url_scheme": self.url_scheme, 

793 "wsgi.input": input_stream, 

794 "wsgi.errors": self.errors_stream, 

795 "wsgi.multithread": self.multithread, 

796 "wsgi.multiprocess": self.multiprocess, 

797 "wsgi.run_once": self.run_once, 

798 } 

799 ) 

800 

801 headers = self.headers.copy() 

802 # Don't send these as headers, they're part of the environ. 

803 headers.remove("Content-Type") 

804 headers.remove("Content-Length") 

805 

806 if content_type is not None: 

807 result["CONTENT_TYPE"] = content_type 

808 

809 if content_length is not None: 

810 result["CONTENT_LENGTH"] = str(content_length) 

811 

812 combined_headers = defaultdict(list) 

813 

814 for key, value in headers.to_wsgi_list(): 

815 combined_headers[f"HTTP_{key.upper().replace('-', '_')}"].append(value) 

816 

817 for key, values in combined_headers.items(): 

818 result[key] = ", ".join(values) 

819 

820 if self.environ_overrides: 

821 result.update(self.environ_overrides) 

822 

823 return result 

824 

825 def get_request(self, cls: t.Optional[t.Type[Request]] = None) -> Request: 

826 """Returns a request with the data. If the request class is not 

827 specified :attr:`request_class` is used. 

828 

829 :param cls: The request wrapper to use. 

830 """ 

831 if cls is None: 

832 cls = self.request_class 

833 

834 return cls(self.get_environ()) 

835 

836 

837class ClientRedirectError(Exception): 

838 """If a redirect loop is detected when using follow_redirects=True with 

839 the :cls:`Client`, then this exception is raised. 

840 """ 

841 

842 

843class Client: 

844 """This class allows you to send requests to a wrapped application. 

845 

846 The use_cookies parameter indicates whether cookies should be stored and 

847 sent for subsequent requests. This is True by default, but passing False 

848 will disable this behaviour. 

849 

850 If you want to request some subdomain of your application you may set 

851 `allow_subdomain_redirects` to `True` as if not no external redirects 

852 are allowed. 

853 

854 .. versionchanged:: 2.1 

855 Removed deprecated behavior of treating the response as a 

856 tuple. All data is available as properties on the returned 

857 response object. 

858 

859 .. versionchanged:: 2.0 

860 ``response_wrapper`` is always a subclass of 

861 :class:``TestResponse``. 

862 

863 .. versionchanged:: 0.5 

864 Added the ``use_cookies`` parameter. 

865 """ 

866 

867 def __init__( 

868 self, 

869 application: "WSGIApplication", 

870 response_wrapper: t.Optional[t.Type["Response"]] = None, 

871 use_cookies: bool = True, 

872 allow_subdomain_redirects: bool = False, 

873 ) -> None: 

874 self.application = application 

875 

876 if response_wrapper in {None, Response}: 

877 response_wrapper = TestResponse 

878 elif not isinstance(response_wrapper, TestResponse): 

879 response_wrapper = type( 

880 "WrapperTestResponse", 

881 (TestResponse, response_wrapper), # type: ignore 

882 {}, 

883 ) 

884 

885 self.response_wrapper = t.cast(t.Type["TestResponse"], response_wrapper) 

886 

887 if use_cookies: 

888 self.cookie_jar: t.Optional[_TestCookieJar] = _TestCookieJar() 

889 else: 

890 self.cookie_jar = None 

891 

892 self.allow_subdomain_redirects = allow_subdomain_redirects 

893 

894 def set_cookie( 

895 self, 

896 server_name: str, 

897 key: str, 

898 value: str = "", 

899 max_age: t.Optional[t.Union[timedelta, int]] = None, 

900 expires: t.Optional[t.Union[str, datetime, int, float]] = None, 

901 path: str = "/", 

902 domain: t.Optional[str] = None, 

903 secure: bool = False, 

904 httponly: bool = False, 

905 samesite: t.Optional[str] = None, 

906 charset: str = "utf-8", 

907 ) -> None: 

908 """Sets a cookie in the client's cookie jar. The server name 

909 is required and has to match the one that is also passed to 

910 the open call. 

911 """ 

912 assert self.cookie_jar is not None, "cookies disabled" 

913 header = dump_cookie( 

914 key, 

915 value, 

916 max_age, 

917 expires, 

918 path, 

919 domain, 

920 secure, 

921 httponly, 

922 charset, 

923 samesite=samesite, 

924 ) 

925 environ = create_environ(path, base_url=f"http://{server_name}") 

926 headers = [("Set-Cookie", header)] 

927 self.cookie_jar.extract_wsgi(environ, headers) 

928 

929 def delete_cookie( 

930 self, 

931 server_name: str, 

932 key: str, 

933 path: str = "/", 

934 domain: t.Optional[str] = None, 

935 secure: bool = False, 

936 httponly: bool = False, 

937 samesite: t.Optional[str] = None, 

938 ) -> None: 

939 """Deletes a cookie in the test client.""" 

940 self.set_cookie( 

941 server_name, 

942 key, 

943 expires=0, 

944 max_age=0, 

945 path=path, 

946 domain=domain, 

947 secure=secure, 

948 httponly=httponly, 

949 samesite=samesite, 

950 ) 

951 

952 def run_wsgi_app( 

953 self, environ: "WSGIEnvironment", buffered: bool = False 

954 ) -> t.Tuple[t.Iterable[bytes], str, Headers]: 

955 """Runs the wrapped WSGI app with the given environment. 

956 

957 :meta private: 

958 """ 

959 if self.cookie_jar is not None: 

960 self.cookie_jar.inject_wsgi(environ) 

961 

962 rv = run_wsgi_app(self.application, environ, buffered=buffered) 

963 

964 if self.cookie_jar is not None: 

965 self.cookie_jar.extract_wsgi(environ, rv[2]) 

966 

967 return rv 

968 

969 def resolve_redirect( 

970 self, response: "TestResponse", buffered: bool = False 

971 ) -> "TestResponse": 

972 """Perform a new request to the location given by the redirect 

973 response to the previous request. 

974 

975 :meta private: 

976 """ 

977 scheme, netloc, path, qs, anchor = url_parse(response.location) 

978 builder = EnvironBuilder.from_environ( 

979 response.request.environ, path=path, query_string=qs 

980 ) 

981 

982 to_name_parts = netloc.split(":", 1)[0].split(".") 

983 from_name_parts = builder.server_name.split(".") 

984 

985 if to_name_parts != [""]: 

986 # The new location has a host, use it for the base URL. 

987 builder.url_scheme = scheme 

988 builder.host = netloc 

989 else: 

990 # A local redirect with autocorrect_location_header=False 

991 # doesn't have a host, so use the request's host. 

992 to_name_parts = from_name_parts 

993 

994 # Explain why a redirect to a different server name won't be followed. 

995 if to_name_parts != from_name_parts: 

996 if to_name_parts[-len(from_name_parts) :] == from_name_parts: 

997 if not self.allow_subdomain_redirects: 

998 raise RuntimeError("Following subdomain redirects is not enabled.") 

999 else: 

1000 raise RuntimeError("Following external redirects is not supported.") 

1001 

1002 path_parts = path.split("/") 

1003 root_parts = builder.script_root.split("/") 

1004 

1005 if path_parts[: len(root_parts)] == root_parts: 

1006 # Strip the script root from the path. 

1007 builder.path = path[len(builder.script_root) :] 

1008 else: 

1009 # The new location is not under the script root, so use the 

1010 # whole path and clear the previous root. 

1011 builder.path = path 

1012 builder.script_root = "" 

1013 

1014 # Only 307 and 308 preserve all of the original request. 

1015 if response.status_code not in {307, 308}: 

1016 # HEAD is preserved, everything else becomes GET. 

1017 if builder.method != "HEAD": 

1018 builder.method = "GET" 

1019 

1020 # Clear the body and the headers that describe it. 

1021 

1022 if builder.input_stream is not None: 

1023 builder.input_stream.close() 

1024 builder.input_stream = None 

1025 

1026 builder.content_type = None 

1027 builder.content_length = None 

1028 builder.headers.pop("Transfer-Encoding", None) 

1029 

1030 return self.open(builder, buffered=buffered) 

1031 

1032 def open( 

1033 self, 

1034 *args: t.Any, 

1035 buffered: bool = False, 

1036 follow_redirects: bool = False, 

1037 **kwargs: t.Any, 

1038 ) -> "TestResponse": 

1039 """Generate an environ dict from the given arguments, make a 

1040 request to the application using it, and return the response. 

1041 

1042 :param args: Passed to :class:`EnvironBuilder` to create the 

1043 environ for the request. If a single arg is passed, it can 

1044 be an existing :class:`EnvironBuilder` or an environ dict. 

1045 :param buffered: Convert the iterator returned by the app into 

1046 a list. If the iterator has a ``close()`` method, it is 

1047 called automatically. 

1048 :param follow_redirects: Make additional requests to follow HTTP 

1049 redirects until a non-redirect status is returned. 

1050 :attr:`TestResponse.history` lists the intermediate 

1051 responses. 

1052 

1053 .. versionchanged:: 2.1 

1054 Removed the ``as_tuple`` parameter. 

1055 

1056 .. versionchanged:: 2.0 

1057 ``as_tuple`` is deprecated and will be removed in Werkzeug 

1058 2.1. Use :attr:`TestResponse.request` and 

1059 ``request.environ`` instead. 

1060 

1061 .. versionchanged:: 2.0 

1062 The request input stream is closed when calling 

1063 ``response.close()``. Input streams for redirects are 

1064 automatically closed. 

1065 

1066 .. versionchanged:: 0.5 

1067 If a dict is provided as file in the dict for the ``data`` 

1068 parameter the content type has to be called ``content_type`` 

1069 instead of ``mimetype``. This change was made for 

1070 consistency with :class:`werkzeug.FileWrapper`. 

1071 

1072 .. versionchanged:: 0.5 

1073 Added the ``follow_redirects`` parameter. 

1074 """ 

1075 request: t.Optional["Request"] = None 

1076 

1077 if not kwargs and len(args) == 1: 

1078 arg = args[0] 

1079 

1080 if isinstance(arg, EnvironBuilder): 

1081 request = arg.get_request() 

1082 elif isinstance(arg, dict): 

1083 request = EnvironBuilder.from_environ(arg).get_request() 

1084 elif isinstance(arg, Request): 

1085 request = arg 

1086 

1087 if request is None: 

1088 builder = EnvironBuilder(*args, **kwargs) 

1089 

1090 try: 

1091 request = builder.get_request() 

1092 finally: 

1093 builder.close() 

1094 

1095 response = self.run_wsgi_app(request.environ, buffered=buffered) 

1096 response = self.response_wrapper(*response, request=request) 

1097 

1098 redirects = set() 

1099 history: t.List["TestResponse"] = [] 

1100 

1101 if not follow_redirects: 

1102 return response 

1103 

1104 while response.status_code in { 

1105 301, 

1106 302, 

1107 303, 

1108 305, 

1109 307, 

1110 308, 

1111 }: 

1112 # Exhaust intermediate response bodies to ensure middleware 

1113 # that returns an iterator runs any cleanup code. 

1114 if not buffered: 

1115 response.make_sequence() 

1116 response.close() 

1117 

1118 new_redirect_entry = (response.location, response.status_code) 

1119 

1120 if new_redirect_entry in redirects: 

1121 raise ClientRedirectError( 

1122 f"Loop detected: A {response.status_code} redirect" 

1123 f" to {response.location} was already made." 

1124 ) 

1125 

1126 redirects.add(new_redirect_entry) 

1127 response.history = tuple(history) 

1128 history.append(response) 

1129 response = self.resolve_redirect(response, buffered=buffered) 

1130 else: 

1131 # This is the final request after redirects. 

1132 response.history = tuple(history) 

1133 # Close the input stream when closing the response, in case 

1134 # the input is an open temporary file. 

1135 response.call_on_close(request.input_stream.close) 

1136 return response 

1137 

1138 def get(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1139 """Call :meth:`open` with ``method`` set to ``GET``.""" 

1140 kw["method"] = "GET" 

1141 return self.open(*args, **kw) 

1142 

1143 def post(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1144 """Call :meth:`open` with ``method`` set to ``POST``.""" 

1145 kw["method"] = "POST" 

1146 return self.open(*args, **kw) 

1147 

1148 def put(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1149 """Call :meth:`open` with ``method`` set to ``PUT``.""" 

1150 kw["method"] = "PUT" 

1151 return self.open(*args, **kw) 

1152 

1153 def delete(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1154 """Call :meth:`open` with ``method`` set to ``DELETE``.""" 

1155 kw["method"] = "DELETE" 

1156 return self.open(*args, **kw) 

1157 

1158 def patch(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1159 """Call :meth:`open` with ``method`` set to ``PATCH``.""" 

1160 kw["method"] = "PATCH" 

1161 return self.open(*args, **kw) 

1162 

1163 def options(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1164 """Call :meth:`open` with ``method`` set to ``OPTIONS``.""" 

1165 kw["method"] = "OPTIONS" 

1166 return self.open(*args, **kw) 

1167 

1168 def head(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1169 """Call :meth:`open` with ``method`` set to ``HEAD``.""" 

1170 kw["method"] = "HEAD" 

1171 return self.open(*args, **kw) 

1172 

1173 def trace(self, *args: t.Any, **kw: t.Any) -> "TestResponse": 

1174 """Call :meth:`open` with ``method`` set to ``TRACE``.""" 

1175 kw["method"] = "TRACE" 

1176 return self.open(*args, **kw) 

1177 

1178 def __repr__(self) -> str: 

1179 return f"<{type(self).__name__} {self.application!r}>" 

1180 

1181 

1182def create_environ(*args: t.Any, **kwargs: t.Any) -> "WSGIEnvironment": 

1183 """Create a new WSGI environ dict based on the values passed. The first 

1184 parameter should be the path of the request which defaults to '/'. The 

1185 second one can either be an absolute path (in that case the host is 

1186 localhost:80) or a full path to the request with scheme, netloc port and 

1187 the path to the script. 

1188 

1189 This accepts the same arguments as the :class:`EnvironBuilder` 

1190 constructor. 

1191 

1192 .. versionchanged:: 0.5 

1193 This function is now a thin wrapper over :class:`EnvironBuilder` which 

1194 was added in 0.5. The `headers`, `environ_base`, `environ_overrides` 

1195 and `charset` parameters were added. 

1196 """ 

1197 builder = EnvironBuilder(*args, **kwargs) 

1198 

1199 try: 

1200 return builder.get_environ() 

1201 finally: 

1202 builder.close() 

1203 

1204 

1205def run_wsgi_app( 

1206 app: "WSGIApplication", environ: "WSGIEnvironment", buffered: bool = False 

1207) -> t.Tuple[t.Iterable[bytes], str, Headers]: 

1208 """Return a tuple in the form (app_iter, status, headers) of the 

1209 application output. This works best if you pass it an application that 

1210 returns an iterator all the time. 

1211 

1212 Sometimes applications may use the `write()` callable returned 

1213 by the `start_response` function. This tries to resolve such edge 

1214 cases automatically. But if you don't get the expected output you 

1215 should set `buffered` to `True` which enforces buffering. 

1216 

1217 If passed an invalid WSGI application the behavior of this function is 

1218 undefined. Never pass non-conforming WSGI applications to this function. 

1219 

1220 :param app: the application to execute. 

1221 :param buffered: set to `True` to enforce buffering. 

1222 :return: tuple in the form ``(app_iter, status, headers)`` 

1223 """ 

1224 # Copy environ to ensure any mutations by the app (ProxyFix, for 

1225 # example) don't affect subsequent requests (such as redirects). 

1226 environ = _get_environ(environ).copy() 

1227 status: str 

1228 response: t.Optional[t.Tuple[str, t.List[t.Tuple[str, str]]]] = None 

1229 buffer: t.List[bytes] = [] 

1230 

1231 def start_response(status, headers, exc_info=None): # type: ignore 

1232 nonlocal response 

1233 

1234 if exc_info: 

1235 try: 

1236 raise exc_info[1].with_traceback(exc_info[2]) 

1237 finally: 

1238 exc_info = None 

1239 

1240 response = (status, headers) 

1241 return buffer.append 

1242 

1243 app_rv = app(environ, start_response) 

1244 close_func = getattr(app_rv, "close", None) 

1245 app_iter: t.Iterable[bytes] = iter(app_rv) 

1246 

1247 # when buffering we emit the close call early and convert the 

1248 # application iterator into a regular list 

1249 if buffered: 

1250 try: 

1251 app_iter = list(app_iter) 

1252 finally: 

1253 if close_func is not None: 

1254 close_func() 

1255 

1256 # otherwise we iterate the application iter until we have a response, chain 

1257 # the already received data with the already collected data and wrap it in 

1258 # a new `ClosingIterator` if we need to restore a `close` callable from the 

1259 # original return value. 

1260 else: 

1261 for item in app_iter: 

1262 buffer.append(item) 

1263 

1264 if response is not None: 

1265 break 

1266 

1267 if buffer: 

1268 app_iter = chain(buffer, app_iter) 

1269 

1270 if close_func is not None and app_iter is not app_rv: 

1271 app_iter = ClosingIterator(app_iter, close_func) 

1272 

1273 status, headers = response # type: ignore 

1274 return app_iter, status, Headers(headers) 

1275 

1276 

1277class TestResponse(Response): 

1278 """:class:`~werkzeug.wrappers.Response` subclass that provides extra 

1279 information about requests made with the test :class:`Client`. 

1280 

1281 Test client requests will always return an instance of this class. 

1282 If a custom response class is passed to the client, it is 

1283 subclassed along with this to support test information. 

1284 

1285 If the test request included large files, or if the application is 

1286 serving a file, call :meth:`close` to close any open files and 

1287 prevent Python showing a ``ResourceWarning``. 

1288 

1289 .. versionchanged:: 2.2 

1290 Set the ``default_mimetype`` to None to prevent a mimetype being 

1291 assumed if missing. 

1292 

1293 .. versionchanged:: 2.1 

1294 Removed deprecated behavior for treating the response instance 

1295 as a tuple. 

1296 

1297 .. versionadded:: 2.0 

1298 Test client methods always return instances of this class. 

1299 """ 

1300 

1301 default_mimetype = None 

1302 # Don't assume a mimetype, instead use whatever the response provides 

1303 

1304 request: Request 

1305 """A request object with the environ used to make the request that 

1306 resulted in this response. 

1307 """ 

1308 

1309 history: t.Tuple["TestResponse", ...] 

1310 """A list of intermediate responses. Populated when the test request 

1311 is made with ``follow_redirects`` enabled. 

1312 """ 

1313 

1314 # Tell Pytest to ignore this, it's not a test class. 

1315 __test__ = False 

1316 

1317 def __init__( 

1318 self, 

1319 response: t.Iterable[bytes], 

1320 status: str, 

1321 headers: Headers, 

1322 request: Request, 

1323 history: t.Tuple["TestResponse"] = (), # type: ignore 

1324 **kwargs: t.Any, 

1325 ) -> None: 

1326 super().__init__(response, status, headers, **kwargs) 

1327 self.request = request 

1328 self.history = history 

1329 self._compat_tuple = response, status, headers 

1330 

1331 @cached_property 

1332 def text(self) -> str: 

1333 """The response data as text. A shortcut for 

1334 ``response.get_data(as_text=True)``. 

1335 

1336 .. versionadded:: 2.1 

1337 """ 

1338 return self.get_data(as_text=True)