Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/test.py: 57%

631 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-09 06:08 +0000

1from __future__ import annotations 

2 

3import dataclasses 

4import mimetypes 

5import sys 

6import typing as t 

7import warnings 

8from collections import defaultdict 

9from datetime import datetime 

10from io import BytesIO 

11from itertools import chain 

12from random import random 

13from tempfile import TemporaryFile 

14from time import time 

15from urllib.parse import unquote 

16from urllib.parse import urlsplit 

17from urllib.parse import urlunsplit 

18 

19from ._internal import _get_environ 

20from ._internal import _make_encode_wrapper 

21from ._internal import _wsgi_decoding_dance 

22from ._internal import _wsgi_encoding_dance 

23from .datastructures import Authorization 

24from .datastructures import CallbackDict 

25from .datastructures import CombinedMultiDict 

26from .datastructures import EnvironHeaders 

27from .datastructures import FileMultiDict 

28from .datastructures import Headers 

29from .datastructures import MultiDict 

30from .http import dump_cookie 

31from .http import dump_options_header 

32from .http import parse_cookie 

33from .http import parse_date 

34from .http import parse_options_header 

35from .sansio.multipart import Data 

36from .sansio.multipart import Epilogue 

37from .sansio.multipart import Field 

38from .sansio.multipart import File 

39from .sansio.multipart import MultipartEncoder 

40from .sansio.multipart import Preamble 

41from .urls import _urlencode 

42from .urls import iri_to_uri 

43from .utils import cached_property 

44from .utils import get_content_type 

45from .wrappers.request import Request 

46from .wrappers.response import Response 

47from .wsgi import ClosingIterator 

48from .wsgi import get_current_url 

49 

50if t.TYPE_CHECKING: 

51 from _typeshed.wsgi import WSGIApplication 

52 from _typeshed.wsgi import WSGIEnvironment 

53 import typing_extensions as te 

54 

55 

56def stream_encode_multipart( 

57 data: t.Mapping[str, t.Any], 

58 use_tempfile: bool = True, 

59 threshold: int = 1024 * 500, 

60 boundary: str | None = None, 

61 charset: str | None = None, 

62) -> tuple[t.IO[bytes], int, str]: 

63 """Encode a dict of values (either strings or file descriptors or 

64 :class:`FileStorage` objects.) into a multipart encoded string stored 

65 in a file descriptor. 

66 

67 .. versionchanged:: 2.3 

68 The ``charset`` parameter is deprecated and will be removed in Werkzeug 3.0 

69 """ 

70 if charset is not None: 

71 warnings.warn( 

72 "The 'charset' parameter is deprecated and will be removed in Werkzeug 3.0", 

73 DeprecationWarning, 

74 stacklevel=2, 

75 ) 

76 else: 

77 charset = "utf-8" 

78 

79 if boundary is None: 

80 boundary = f"---------------WerkzeugFormPart_{time()}{random()}" 

81 

82 stream: t.IO[bytes] = BytesIO() 

83 total_length = 0 

84 on_disk = False 

85 write_binary: t.Callable[[bytes], int] 

86 

87 if use_tempfile: 

88 

89 def write_binary(s: bytes) -> int: 

90 nonlocal stream, total_length, on_disk 

91 

92 if on_disk: 

93 return stream.write(s) 

94 else: 

95 length = len(s) 

96 

97 if length + total_length <= threshold: 

98 stream.write(s) 

99 else: 

100 new_stream = t.cast(t.IO[bytes], TemporaryFile("wb+")) 

101 new_stream.write(stream.getvalue()) # type: ignore 

102 new_stream.write(s) 

103 stream = new_stream 

104 on_disk = True 

105 

106 total_length += length 

107 return length 

108 

109 else: 

110 write_binary = stream.write 

111 

112 encoder = MultipartEncoder(boundary.encode()) 

113 write_binary(encoder.send_event(Preamble(data=b""))) 

114 for key, value in _iter_data(data): 

115 reader = getattr(value, "read", None) 

116 if reader is not None: 

117 filename = getattr(value, "filename", getattr(value, "name", None)) 

118 content_type = getattr(value, "content_type", None) 

119 if content_type is None: 

120 content_type = ( 

121 filename 

122 and mimetypes.guess_type(filename)[0] 

123 or "application/octet-stream" 

124 ) 

125 headers = value.headers 

126 headers.update([("Content-Type", content_type)]) 

127 if filename is None: 

128 write_binary(encoder.send_event(Field(name=key, headers=headers))) 

129 else: 

130 write_binary( 

131 encoder.send_event( 

132 File(name=key, filename=filename, headers=headers) 

133 ) 

134 ) 

135 while True: 

136 chunk = reader(16384) 

137 

138 if not chunk: 

139 break 

140 

141 write_binary(encoder.send_event(Data(data=chunk, more_data=True))) 

142 else: 

143 if not isinstance(value, str): 

144 value = str(value) 

145 write_binary(encoder.send_event(Field(name=key, headers=Headers()))) 

146 write_binary( 

147 encoder.send_event(Data(data=value.encode(charset), more_data=False)) 

148 ) 

149 

150 write_binary(encoder.send_event(Epilogue(data=b""))) 

151 

152 length = stream.tell() 

153 stream.seek(0) 

154 return stream, length, boundary 

155 

156 

157def encode_multipart( 

158 values: t.Mapping[str, t.Any], 

159 boundary: str | None = None, 

160 charset: str | None = None, 

161) -> tuple[str, bytes]: 

162 """Like `stream_encode_multipart` but returns a tuple in the form 

163 (``boundary``, ``data``) where data is bytes. 

164 

165 .. versionchanged:: 2.3 

166 The ``charset`` parameter is deprecated and will be removed in Werkzeug 3.0 

167 """ 

168 stream, length, boundary = stream_encode_multipart( 

169 values, use_tempfile=False, boundary=boundary, charset=charset 

170 ) 

171 return boundary, stream.read() 

172 

173 

174def _iter_data(data: t.Mapping[str, t.Any]) -> t.Iterator[tuple[str, t.Any]]: 

175 """Iterate over a mapping that might have a list of values, yielding 

176 all key, value pairs. Almost like iter_multi_items but only allows 

177 lists, not tuples, of values so tuples can be used for files. 

178 """ 

179 if isinstance(data, MultiDict): 

180 yield from data.items(multi=True) 

181 else: 

182 for key, value in data.items(): 

183 if isinstance(value, list): 

184 for v in value: 

185 yield key, v 

186 else: 

187 yield key, value 

188 

189 

190_TAnyMultiDict = t.TypeVar("_TAnyMultiDict", bound=MultiDict) 

191 

192 

193class EnvironBuilder: 

194 """This class can be used to conveniently create a WSGI environment 

195 for testing purposes. It can be used to quickly create WSGI environments 

196 or request objects from arbitrary data. 

197 

198 The signature of this class is also used in some other places as of 

199 Werkzeug 0.5 (:func:`create_environ`, :meth:`Response.from_values`, 

200 :meth:`Client.open`). Because of this most of the functionality is 

201 available through the constructor alone. 

202 

203 Files and regular form data can be manipulated independently of each 

204 other with the :attr:`form` and :attr:`files` attributes, but are 

205 passed with the same argument to the constructor: `data`. 

206 

207 `data` can be any of these values: 

208 

209 - a `str` or `bytes` object: The object is converted into an 

210 :attr:`input_stream`, the :attr:`content_length` is set and you have to 

211 provide a :attr:`content_type`. 

212 - a `dict` or :class:`MultiDict`: The keys have to be strings. The values 

213 have to be either any of the following objects, or a list of any of the 

214 following objects: 

215 

216 - a :class:`file`-like object: These are converted into 

217 :class:`FileStorage` objects automatically. 

218 - a `tuple`: The :meth:`~FileMultiDict.add_file` method is called 

219 with the key and the unpacked `tuple` items as positional 

220 arguments. 

221 - a `str`: The string is set as form data for the associated key. 

222 - a file-like object: The object content is loaded in memory and then 

223 handled like a regular `str` or a `bytes`. 

224 

225 :param path: the path of the request. In the WSGI environment this will 

226 end up as `PATH_INFO`. If the `query_string` is not defined 

227 and there is a question mark in the `path` everything after 

228 it is used as query string. 

229 :param base_url: the base URL is a URL that is used to extract the WSGI 

230 URL scheme, host (server name + server port) and the 

231 script root (`SCRIPT_NAME`). 

232 :param query_string: an optional string or dict with URL parameters. 

233 :param method: the HTTP method to use, defaults to `GET`. 

234 :param input_stream: an optional input stream. Do not specify this and 

235 `data`. As soon as an input stream is set you can't 

236 modify :attr:`args` and :attr:`files` unless you 

237 set the :attr:`input_stream` to `None` again. 

238 :param content_type: The content type for the request. As of 0.5 you 

239 don't have to provide this when specifying files 

240 and form data via `data`. 

241 :param content_length: The content length for the request. You don't 

242 have to specify this when providing data via 

243 `data`. 

244 :param errors_stream: an optional error stream that is used for 

245 `wsgi.errors`. Defaults to :data:`stderr`. 

246 :param multithread: controls `wsgi.multithread`. Defaults to `False`. 

247 :param multiprocess: controls `wsgi.multiprocess`. Defaults to `False`. 

248 :param run_once: controls `wsgi.run_once`. Defaults to `False`. 

249 :param headers: an optional list or :class:`Headers` object of headers. 

250 :param data: a string or dict of form data or a file-object. 

251 See explanation above. 

252 :param json: An object to be serialized and assigned to ``data``. 

253 Defaults the content type to ``"application/json"``. 

254 Serialized with the function assigned to :attr:`json_dumps`. 

255 :param environ_base: an optional dict of environment defaults. 

256 :param environ_overrides: an optional dict of environment overrides. 

257 :param auth: An authorization object to use for the 

258 ``Authorization`` header value. A ``(username, password)`` tuple 

259 is a shortcut for ``Basic`` authorization. 

260 

261 .. versionchanged:: 2.3 

262 The ``charset`` parameter is deprecated and will be removed in Werkzeug 3.0 

263 

264 .. versionchanged:: 2.1 

265 ``CONTENT_TYPE`` and ``CONTENT_LENGTH`` are not duplicated as 

266 header keys in the environ. 

267 

268 .. versionchanged:: 2.0 

269 ``REQUEST_URI`` and ``RAW_URI`` is the full raw URI including 

270 the query string, not only the path. 

271 

272 .. versionchanged:: 2.0 

273 The default :attr:`request_class` is ``Request`` instead of 

274 ``BaseRequest``. 

275 

276 .. versionadded:: 2.0 

277 Added the ``auth`` parameter. 

278 

279 .. versionadded:: 0.15 

280 The ``json`` param and :meth:`json_dumps` method. 

281 

282 .. versionadded:: 0.15 

283 The environ has keys ``REQUEST_URI`` and ``RAW_URI`` containing 

284 the path before percent-decoding. This is not part of the WSGI 

285 PEP, but many WSGI servers include it. 

286 

287 .. versionchanged:: 0.6 

288 ``path`` and ``base_url`` can now be unicode strings that are 

289 encoded with :func:`iri_to_uri`. 

290 """ 

291 

292 #: the server protocol to use. defaults to HTTP/1.1 

293 server_protocol = "HTTP/1.1" 

294 

295 #: the wsgi version to use. defaults to (1, 0) 

296 wsgi_version = (1, 0) 

297 

298 #: The default request class used by :meth:`get_request`. 

299 request_class = Request 

300 

301 import json 

302 

303 #: The serialization function used when ``json`` is passed. 

304 json_dumps = staticmethod(json.dumps) 

305 del json 

306 

307 _args: MultiDict | None 

308 _query_string: str | None 

309 _input_stream: t.IO[bytes] | None 

310 _form: MultiDict | None 

311 _files: FileMultiDict | None 

312 

313 def __init__( 

314 self, 

315 path: str = "/", 

316 base_url: str | None = None, 

317 query_string: t.Mapping[str, str] | str | None = None, 

318 method: str = "GET", 

319 input_stream: t.IO[bytes] | None = None, 

320 content_type: str | None = None, 

321 content_length: int | None = None, 

322 errors_stream: t.IO[str] | None = None, 

323 multithread: bool = False, 

324 multiprocess: bool = False, 

325 run_once: bool = False, 

326 headers: Headers | t.Iterable[tuple[str, str]] | None = None, 

327 data: None | (t.IO[bytes] | str | bytes | t.Mapping[str, t.Any]) = None, 

328 environ_base: t.Mapping[str, t.Any] | None = None, 

329 environ_overrides: t.Mapping[str, t.Any] | None = None, 

330 charset: str | None = None, 

331 mimetype: str | None = None, 

332 json: t.Mapping[str, t.Any] | None = None, 

333 auth: Authorization | tuple[str, str] | None = None, 

334 ) -> None: 

335 path_s = _make_encode_wrapper(path) 

336 if query_string is not None and path_s("?") in path: 

337 raise ValueError("Query string is defined in the path and as an argument") 

338 request_uri = urlsplit(path) 

339 if query_string is None and path_s("?") in path: 

340 query_string = request_uri.query 

341 

342 if charset is not None: 

343 warnings.warn( 

344 "The 'charset' parameter is deprecated and will be" 

345 " removed in Werkzeug 3.0", 

346 DeprecationWarning, 

347 stacklevel=2, 

348 ) 

349 else: 

350 charset = "utf-8" 

351 

352 self.charset = charset 

353 self.path = iri_to_uri(request_uri.path) 

354 self.request_uri = path 

355 if base_url is not None: 

356 base_url = iri_to_uri( 

357 base_url, charset=charset if charset != "utf-8" else None 

358 ) 

359 self.base_url = base_url # type: ignore 

360 if isinstance(query_string, str): 

361 self.query_string = query_string 

362 else: 

363 if query_string is None: 

364 query_string = MultiDict() 

365 elif not isinstance(query_string, MultiDict): 

366 query_string = MultiDict(query_string) 

367 self.args = query_string 

368 self.method = method 

369 if headers is None: 

370 headers = Headers() 

371 elif not isinstance(headers, Headers): 

372 headers = Headers(headers) 

373 self.headers = headers 

374 if content_type is not None: 

375 self.content_type = content_type 

376 if errors_stream is None: 

377 errors_stream = sys.stderr 

378 self.errors_stream = errors_stream 

379 self.multithread = multithread 

380 self.multiprocess = multiprocess 

381 self.run_once = run_once 

382 self.environ_base = environ_base 

383 self.environ_overrides = environ_overrides 

384 self.input_stream = input_stream 

385 self.content_length = content_length 

386 self.closed = False 

387 

388 if auth is not None: 

389 if isinstance(auth, tuple): 

390 auth = Authorization( 

391 "basic", {"username": auth[0], "password": auth[1]} 

392 ) 

393 

394 self.headers.set("Authorization", auth.to_header()) 

395 

396 if json is not None: 

397 if data is not None: 

398 raise TypeError("can't provide both json and data") 

399 

400 data = self.json_dumps(json) 

401 

402 if self.content_type is None: 

403 self.content_type = "application/json" 

404 

405 if data: 

406 if input_stream is not None: 

407 raise TypeError("can't provide input stream and data") 

408 if hasattr(data, "read"): 

409 data = data.read() 

410 if isinstance(data, str): 

411 data = data.encode(self.charset) 

412 if isinstance(data, bytes): 

413 self.input_stream = BytesIO(data) 

414 if self.content_length is None: 

415 self.content_length = len(data) 

416 else: 

417 for key, value in _iter_data(data): 

418 if isinstance(value, (tuple, dict)) or hasattr(value, "read"): 

419 self._add_file_from_data(key, value) 

420 else: 

421 self.form.setlistdefault(key).append(value) 

422 

423 if mimetype is not None: 

424 self.mimetype = mimetype 

425 

426 @classmethod 

427 def from_environ(cls, environ: WSGIEnvironment, **kwargs: t.Any) -> EnvironBuilder: 

428 """Turn an environ dict back into a builder. Any extra kwargs 

429 override the args extracted from the environ. 

430 

431 .. versionchanged:: 2.0 

432 Path and query values are passed through the WSGI decoding 

433 dance to avoid double encoding. 

434 

435 .. versionadded:: 0.15 

436 """ 

437 headers = Headers(EnvironHeaders(environ)) 

438 out = { 

439 "path": _wsgi_decoding_dance(environ["PATH_INFO"]), 

440 "base_url": cls._make_base_url( 

441 environ["wsgi.url_scheme"], 

442 headers.pop("Host"), 

443 _wsgi_decoding_dance(environ["SCRIPT_NAME"]), 

444 ), 

445 "query_string": _wsgi_decoding_dance(environ["QUERY_STRING"]), 

446 "method": environ["REQUEST_METHOD"], 

447 "input_stream": environ["wsgi.input"], 

448 "content_type": headers.pop("Content-Type", None), 

449 "content_length": headers.pop("Content-Length", None), 

450 "errors_stream": environ["wsgi.errors"], 

451 "multithread": environ["wsgi.multithread"], 

452 "multiprocess": environ["wsgi.multiprocess"], 

453 "run_once": environ["wsgi.run_once"], 

454 "headers": headers, 

455 } 

456 out.update(kwargs) 

457 return cls(**out) 

458 

459 def _add_file_from_data( 

460 self, 

461 key: str, 

462 value: (t.IO[bytes] | tuple[t.IO[bytes], str] | tuple[t.IO[bytes], str, str]), 

463 ) -> None: 

464 """Called in the EnvironBuilder to add files from the data dict.""" 

465 if isinstance(value, tuple): 

466 self.files.add_file(key, *value) 

467 else: 

468 self.files.add_file(key, value) 

469 

470 @staticmethod 

471 def _make_base_url(scheme: str, host: str, script_root: str) -> str: 

472 return urlunsplit((scheme, host, script_root, "", "")).rstrip("/") + "/" 

473 

474 @property 

475 def base_url(self) -> str: 

476 """The base URL is used to extract the URL scheme, host name, 

477 port, and root path. 

478 """ 

479 return self._make_base_url(self.url_scheme, self.host, self.script_root) 

480 

481 @base_url.setter 

482 def base_url(self, value: str | None) -> None: 

483 if value is None: 

484 scheme = "http" 

485 netloc = "localhost" 

486 script_root = "" 

487 else: 

488 scheme, netloc, script_root, qs, anchor = urlsplit(value) 

489 if qs or anchor: 

490 raise ValueError("base url must not contain a query string or fragment") 

491 self.script_root = script_root.rstrip("/") 

492 self.host = netloc 

493 self.url_scheme = scheme 

494 

495 @property 

496 def content_type(self) -> str | None: 

497 """The content type for the request. Reflected from and to 

498 the :attr:`headers`. Do not set if you set :attr:`files` or 

499 :attr:`form` for auto detection. 

500 """ 

501 ct = self.headers.get("Content-Type") 

502 if ct is None and not self._input_stream: 

503 if self._files: 

504 return "multipart/form-data" 

505 if self._form: 

506 return "application/x-www-form-urlencoded" 

507 return None 

508 return ct 

509 

510 @content_type.setter 

511 def content_type(self, value: str | None) -> None: 

512 if value is None: 

513 self.headers.pop("Content-Type", None) 

514 else: 

515 self.headers["Content-Type"] = value 

516 

517 @property 

518 def mimetype(self) -> str | None: 

519 """The mimetype (content type without charset etc.) 

520 

521 .. versionadded:: 0.14 

522 """ 

523 ct = self.content_type 

524 return ct.split(";")[0].strip() if ct else None 

525 

526 @mimetype.setter 

527 def mimetype(self, value: str) -> None: 

528 self.content_type = get_content_type(value, self.charset) 

529 

530 @property 

531 def mimetype_params(self) -> t.Mapping[str, str]: 

532 """The mimetype parameters as dict. For example if the 

533 content type is ``text/html; charset=utf-8`` the params would be 

534 ``{'charset': 'utf-8'}``. 

535 

536 .. versionadded:: 0.14 

537 """ 

538 

539 def on_update(d: CallbackDict) -> None: 

540 self.headers["Content-Type"] = dump_options_header(self.mimetype, d) 

541 

542 d = parse_options_header(self.headers.get("content-type", ""))[1] 

543 return CallbackDict(d, on_update) 

544 

545 @property 

546 def content_length(self) -> int | None: 

547 """The content length as integer. Reflected from and to the 

548 :attr:`headers`. Do not set if you set :attr:`files` or 

549 :attr:`form` for auto detection. 

550 """ 

551 return self.headers.get("Content-Length", type=int) 

552 

553 @content_length.setter 

554 def content_length(self, value: int | None) -> None: 

555 if value is None: 

556 self.headers.pop("Content-Length", None) 

557 else: 

558 self.headers["Content-Length"] = str(value) 

559 

560 def _get_form(self, name: str, storage: type[_TAnyMultiDict]) -> _TAnyMultiDict: 

561 """Common behavior for getting the :attr:`form` and 

562 :attr:`files` properties. 

563 

564 :param name: Name of the internal cached attribute. 

565 :param storage: Storage class used for the data. 

566 """ 

567 if self.input_stream is not None: 

568 raise AttributeError("an input stream is defined") 

569 

570 rv = getattr(self, name) 

571 

572 if rv is None: 

573 rv = storage() 

574 setattr(self, name, rv) 

575 

576 return rv # type: ignore 

577 

578 def _set_form(self, name: str, value: MultiDict) -> None: 

579 """Common behavior for setting the :attr:`form` and 

580 :attr:`files` properties. 

581 

582 :param name: Name of the internal cached attribute. 

583 :param value: Value to assign to the attribute. 

584 """ 

585 self._input_stream = None 

586 setattr(self, name, value) 

587 

588 @property 

589 def form(self) -> MultiDict: 

590 """A :class:`MultiDict` of form values.""" 

591 return self._get_form("_form", MultiDict) 

592 

593 @form.setter 

594 def form(self, value: MultiDict) -> None: 

595 self._set_form("_form", value) 

596 

597 @property 

598 def files(self) -> FileMultiDict: 

599 """A :class:`FileMultiDict` of uploaded files. Use 

600 :meth:`~FileMultiDict.add_file` to add new files. 

601 """ 

602 return self._get_form("_files", FileMultiDict) 

603 

604 @files.setter 

605 def files(self, value: FileMultiDict) -> None: 

606 self._set_form("_files", value) 

607 

608 @property 

609 def input_stream(self) -> t.IO[bytes] | None: 

610 """An optional input stream. This is mutually exclusive with 

611 setting :attr:`form` and :attr:`files`, setting it will clear 

612 those. Do not provide this if the method is not ``POST`` or 

613 another method that has a body. 

614 """ 

615 return self._input_stream 

616 

617 @input_stream.setter 

618 def input_stream(self, value: t.IO[bytes] | None) -> None: 

619 self._input_stream = value 

620 self._form = None 

621 self._files = None 

622 

623 @property 

624 def query_string(self) -> str: 

625 """The query string. If you set this to a string 

626 :attr:`args` will no longer be available. 

627 """ 

628 if self._query_string is None: 

629 if self._args is not None: 

630 return _urlencode(self._args, encoding=self.charset) 

631 return "" 

632 return self._query_string 

633 

634 @query_string.setter 

635 def query_string(self, value: str | None) -> None: 

636 self._query_string = value 

637 self._args = None 

638 

639 @property 

640 def args(self) -> MultiDict: 

641 """The URL arguments as :class:`MultiDict`.""" 

642 if self._query_string is not None: 

643 raise AttributeError("a query string is defined") 

644 if self._args is None: 

645 self._args = MultiDict() 

646 return self._args 

647 

648 @args.setter 

649 def args(self, value: MultiDict | None) -> None: 

650 self._query_string = None 

651 self._args = value 

652 

653 @property 

654 def server_name(self) -> str: 

655 """The server name (read-only, use :attr:`host` to set)""" 

656 return self.host.split(":", 1)[0] 

657 

658 @property 

659 def server_port(self) -> int: 

660 """The server port as integer (read-only, use :attr:`host` to set)""" 

661 pieces = self.host.split(":", 1) 

662 

663 if len(pieces) == 2: 

664 try: 

665 return int(pieces[1]) 

666 except ValueError: 

667 pass 

668 

669 if self.url_scheme == "https": 

670 return 443 

671 return 80 

672 

673 def __del__(self) -> None: 

674 try: 

675 self.close() 

676 except Exception: 

677 pass 

678 

679 def close(self) -> None: 

680 """Closes all files. If you put real :class:`file` objects into the 

681 :attr:`files` dict you can call this method to automatically close 

682 them all in one go. 

683 """ 

684 if self.closed: 

685 return 

686 try: 

687 files = self.files.values() 

688 except AttributeError: 

689 files = () # type: ignore 

690 for f in files: 

691 try: 

692 f.close() 

693 except Exception: 

694 pass 

695 self.closed = True 

696 

697 def get_environ(self) -> WSGIEnvironment: 

698 """Return the built environ. 

699 

700 .. versionchanged:: 0.15 

701 The content type and length headers are set based on 

702 input stream detection. Previously this only set the WSGI 

703 keys. 

704 """ 

705 input_stream = self.input_stream 

706 content_length = self.content_length 

707 

708 mimetype = self.mimetype 

709 content_type = self.content_type 

710 

711 if input_stream is not None: 

712 start_pos = input_stream.tell() 

713 input_stream.seek(0, 2) 

714 end_pos = input_stream.tell() 

715 input_stream.seek(start_pos) 

716 content_length = end_pos - start_pos 

717 elif mimetype == "multipart/form-data": 

718 charset = self.charset if self.charset != "utf-8" else None 

719 input_stream, content_length, boundary = stream_encode_multipart( 

720 CombinedMultiDict([self.form, self.files]), charset=charset 

721 ) 

722 content_type = f'{mimetype}; boundary="{boundary}"' 

723 elif mimetype == "application/x-www-form-urlencoded": 

724 form_encoded = _urlencode(self.form, encoding=self.charset).encode("ascii") 

725 content_length = len(form_encoded) 

726 input_stream = BytesIO(form_encoded) 

727 else: 

728 input_stream = BytesIO() 

729 

730 result: WSGIEnvironment = {} 

731 if self.environ_base: 

732 result.update(self.environ_base) 

733 

734 def _path_encode(x: str) -> str: 

735 return _wsgi_encoding_dance(unquote(x, encoding=self.charset), self.charset) 

736 

737 raw_uri = _wsgi_encoding_dance(self.request_uri, self.charset) 

738 result.update( 

739 { 

740 "REQUEST_METHOD": self.method, 

741 "SCRIPT_NAME": _path_encode(self.script_root), 

742 "PATH_INFO": _path_encode(self.path), 

743 "QUERY_STRING": _wsgi_encoding_dance(self.query_string, self.charset), 

744 # Non-standard, added by mod_wsgi, uWSGI 

745 "REQUEST_URI": raw_uri, 

746 # Non-standard, added by gunicorn 

747 "RAW_URI": raw_uri, 

748 "SERVER_NAME": self.server_name, 

749 "SERVER_PORT": str(self.server_port), 

750 "HTTP_HOST": self.host, 

751 "SERVER_PROTOCOL": self.server_protocol, 

752 "wsgi.version": self.wsgi_version, 

753 "wsgi.url_scheme": self.url_scheme, 

754 "wsgi.input": input_stream, 

755 "wsgi.errors": self.errors_stream, 

756 "wsgi.multithread": self.multithread, 

757 "wsgi.multiprocess": self.multiprocess, 

758 "wsgi.run_once": self.run_once, 

759 } 

760 ) 

761 

762 headers = self.headers.copy() 

763 # Don't send these as headers, they're part of the environ. 

764 headers.remove("Content-Type") 

765 headers.remove("Content-Length") 

766 

767 if content_type is not None: 

768 result["CONTENT_TYPE"] = content_type 

769 

770 if content_length is not None: 

771 result["CONTENT_LENGTH"] = str(content_length) 

772 

773 combined_headers = defaultdict(list) 

774 

775 for key, value in headers.to_wsgi_list(): 

776 combined_headers[f"HTTP_{key.upper().replace('-', '_')}"].append(value) 

777 

778 for key, values in combined_headers.items(): 

779 result[key] = ", ".join(values) 

780 

781 if self.environ_overrides: 

782 result.update(self.environ_overrides) 

783 

784 return result 

785 

786 def get_request(self, cls: type[Request] | None = None) -> Request: 

787 """Returns a request with the data. If the request class is not 

788 specified :attr:`request_class` is used. 

789 

790 :param cls: The request wrapper to use. 

791 """ 

792 if cls is None: 

793 cls = self.request_class 

794 

795 return cls(self.get_environ()) 

796 

797 

798class ClientRedirectError(Exception): 

799 """If a redirect loop is detected when using follow_redirects=True with 

800 the :cls:`Client`, then this exception is raised. 

801 """ 

802 

803 

804class Client: 

805 """Simulate sending requests to a WSGI application without running a WSGI or HTTP 

806 server. 

807 

808 :param application: The WSGI application to make requests to. 

809 :param response_wrapper: A :class:`.Response` class to wrap response data with. 

810 Defaults to :class:`.TestResponse`. If it's not a subclass of ``TestResponse``, 

811 one will be created. 

812 :param use_cookies: Persist cookies from ``Set-Cookie`` response headers to the 

813 ``Cookie`` header in subsequent requests. Domain and path matching is supported, 

814 but other cookie parameters are ignored. 

815 :param allow_subdomain_redirects: Allow requests to follow redirects to subdomains. 

816 Enable this if the application handles subdomains and redirects between them. 

817 

818 .. versionchanged:: 2.3 

819 Simplify cookie implementation, support domain and path matching. 

820 

821 .. versionchanged:: 2.1 

822 All data is available as properties on the returned response object. The 

823 response cannot be returned as a tuple. 

824 

825 .. versionchanged:: 2.0 

826 ``response_wrapper`` is always a subclass of :class:``TestResponse``. 

827 

828 .. versionchanged:: 0.5 

829 Added the ``use_cookies`` parameter. 

830 """ 

831 

832 def __init__( 

833 self, 

834 application: WSGIApplication, 

835 response_wrapper: type[Response] | None = None, 

836 use_cookies: bool = True, 

837 allow_subdomain_redirects: bool = False, 

838 ) -> None: 

839 self.application = application 

840 

841 if response_wrapper in {None, Response}: 

842 response_wrapper = TestResponse 

843 elif not isinstance(response_wrapper, TestResponse): 

844 response_wrapper = type( 

845 "WrapperTestResponse", 

846 (TestResponse, response_wrapper), # type: ignore 

847 {}, 

848 ) 

849 

850 self.response_wrapper = t.cast(t.Type["TestResponse"], response_wrapper) 

851 

852 if use_cookies: 

853 self._cookies: dict[tuple[str, str, str], Cookie] | None = {} 

854 else: 

855 self._cookies = None 

856 

857 self.allow_subdomain_redirects = allow_subdomain_redirects 

858 

859 @property 

860 def cookie_jar(self) -> t.Iterable[Cookie] | None: 

861 warnings.warn( 

862 "The 'cookie_jar' attribute is a private API and will be removed in" 

863 " Werkzeug 3.0. Use the 'get_cookie' method instead.", 

864 DeprecationWarning, 

865 stacklevel=2, 

866 ) 

867 

868 if self._cookies is None: 

869 return None 

870 

871 return self._cookies.values() 

872 

873 def get_cookie( 

874 self, key: str, domain: str = "localhost", path: str = "/" 

875 ) -> Cookie | None: 

876 """Return a :class:`.Cookie` if it exists. Cookies are uniquely identified by 

877 ``(domain, path, key)``. 

878 

879 :param key: The decoded form of the key for the cookie. 

880 :param domain: The domain the cookie was set for. 

881 :param path: The path the cookie was set for. 

882 

883 .. versionadded:: 2.3 

884 """ 

885 if self._cookies is None: 

886 raise TypeError( 

887 "Cookies are disabled. Create a client with 'use_cookies=True'." 

888 ) 

889 

890 return self._cookies.get((domain, path, key)) 

891 

892 def set_cookie( 

893 self, 

894 key: str, 

895 value: str = "", 

896 *args: t.Any, 

897 domain: str = "localhost", 

898 origin_only: bool = True, 

899 path: str = "/", 

900 **kwargs: t.Any, 

901 ) -> None: 

902 """Set a cookie to be sent in subsequent requests. 

903 

904 This is a convenience to skip making a test request to a route that would set 

905 the cookie. To test the cookie, make a test request to a route that uses the 

906 cookie value. 

907 

908 The client uses ``domain``, ``origin_only``, and ``path`` to determine which 

909 cookies to send with a request. It does not use other cookie parameters that 

910 browsers use, since they're not applicable in tests. 

911 

912 :param key: The key part of the cookie. 

913 :param value: The value part of the cookie. 

914 :param domain: Send this cookie with requests that match this domain. If 

915 ``origin_only`` is true, it must be an exact match, otherwise it may be a 

916 suffix match. 

917 :param origin_only: Whether the domain must be an exact match to the request. 

918 :param path: Send this cookie with requests that match this path either exactly 

919 or as a prefix. 

920 :param kwargs: Passed to :func:`.dump_cookie`. 

921 

922 .. versionchanged:: 2.3 

923 The ``origin_only`` parameter was added. 

924 

925 .. versionchanged:: 2.3 

926 The ``domain`` parameter defaults to ``localhost``. 

927 

928 .. versionchanged:: 2.3 

929 The first parameter ``server_name`` is deprecated and will be removed in 

930 Werkzeug 3.0. The first parameter is ``key``. Use the ``domain`` and 

931 ``origin_only`` parameters instead. 

932 """ 

933 if self._cookies is None: 

934 raise TypeError( 

935 "Cookies are disabled. Create a client with 'use_cookies=True'." 

936 ) 

937 

938 if args: 

939 warnings.warn( 

940 "The first parameter 'server_name' is no longer used, and will be" 

941 " removed in Werkzeug 3.0. The positional parameters are 'key' and" 

942 " 'value'. Use the 'domain' and 'origin_only' parameters instead.", 

943 DeprecationWarning, 

944 stacklevel=2, 

945 ) 

946 domain = key 

947 key = value 

948 value = args[0] 

949 

950 cookie = Cookie._from_response_header( 

951 domain, "/", dump_cookie(key, value, domain=domain, path=path, **kwargs) 

952 ) 

953 cookie.origin_only = origin_only 

954 

955 if cookie._should_delete: 

956 self._cookies.pop(cookie._storage_key, None) 

957 else: 

958 self._cookies[cookie._storage_key] = cookie 

959 

960 def delete_cookie( 

961 self, 

962 key: str, 

963 *args: t.Any, 

964 domain: str = "localhost", 

965 path: str = "/", 

966 **kwargs: t.Any, 

967 ) -> None: 

968 """Delete a cookie if it exists. Cookies are uniquely identified by 

969 ``(domain, path, key)``. 

970 

971 :param key: The decoded form of the key for the cookie. 

972 :param domain: The domain the cookie was set for. 

973 :param path: The path the cookie was set for. 

974 

975 .. versionchanged:: 2.3 

976 The ``domain`` parameter defaults to ``localhost``. 

977 

978 .. versionchanged:: 2.3 

979 The first parameter ``server_name`` is deprecated and will be removed in 

980 Werkzeug 3.0. The first parameter is ``key``. Use the ``domain`` parameter 

981 instead. 

982 

983 .. versionchanged:: 2.3 

984 The ``secure``, ``httponly`` and ``samesite`` parameters are deprecated and 

985 will be removed in Werkzeug 2.4. 

986 """ 

987 if self._cookies is None: 

988 raise TypeError( 

989 "Cookies are disabled. Create a client with 'use_cookies=True'." 

990 ) 

991 

992 if args: 

993 warnings.warn( 

994 "The first parameter 'server_name' is no longer used, and will be" 

995 " removed in Werkzeug 2.4. The first parameter is 'key'. Use the" 

996 " 'domain' parameter instead.", 

997 DeprecationWarning, 

998 stacklevel=2, 

999 ) 

1000 domain = key 

1001 key = args[0] 

1002 

1003 if kwargs: 

1004 kwargs_keys = ", ".join(f"'{k}'" for k in kwargs) 

1005 plural = "parameters are" if len(kwargs) > 1 else "parameter is" 

1006 warnings.warn( 

1007 f"The {kwargs_keys} {plural} deprecated and will be" 

1008 f" removed in Werkzeug 2.4.", 

1009 DeprecationWarning, 

1010 stacklevel=2, 

1011 ) 

1012 

1013 self._cookies.pop((domain, path, key), None) 

1014 

1015 def _add_cookies_to_wsgi(self, environ: WSGIEnvironment) -> None: 

1016 """If cookies are enabled, set the ``Cookie`` header in the environ to the 

1017 cookies that are applicable to the request host and path. 

1018 

1019 :meta private: 

1020 

1021 .. versionadded:: 2.3 

1022 """ 

1023 if self._cookies is None: 

1024 return 

1025 

1026 url = urlsplit(get_current_url(environ)) 

1027 server_name = url.hostname or "localhost" 

1028 value = "; ".join( 

1029 c._to_request_header() 

1030 for c in self._cookies.values() 

1031 if c._matches_request(server_name, url.path) 

1032 ) 

1033 

1034 if value: 

1035 environ["HTTP_COOKIE"] = value 

1036 else: 

1037 environ.pop("HTTP_COOKIE", None) 

1038 

1039 def _update_cookies_from_response( 

1040 self, server_name: str, path: str, headers: list[str] 

1041 ) -> None: 

1042 """If cookies are enabled, update the stored cookies from any ``Set-Cookie`` 

1043 headers in the response. 

1044 

1045 :meta private: 

1046 

1047 .. versionadded:: 2.3 

1048 """ 

1049 if self._cookies is None: 

1050 return 

1051 

1052 for header in headers: 

1053 cookie = Cookie._from_response_header(server_name, path, header) 

1054 

1055 if cookie._should_delete: 

1056 self._cookies.pop(cookie._storage_key, None) 

1057 else: 

1058 self._cookies[cookie._storage_key] = cookie 

1059 

1060 def run_wsgi_app( 

1061 self, environ: WSGIEnvironment, buffered: bool = False 

1062 ) -> tuple[t.Iterable[bytes], str, Headers]: 

1063 """Runs the wrapped WSGI app with the given environment. 

1064 

1065 :meta private: 

1066 """ 

1067 self._add_cookies_to_wsgi(environ) 

1068 rv = run_wsgi_app(self.application, environ, buffered=buffered) 

1069 url = urlsplit(get_current_url(environ)) 

1070 self._update_cookies_from_response( 

1071 url.hostname or "localhost", url.path, rv[2].getlist("Set-Cookie") 

1072 ) 

1073 return rv 

1074 

1075 def resolve_redirect( 

1076 self, response: TestResponse, buffered: bool = False 

1077 ) -> TestResponse: 

1078 """Perform a new request to the location given by the redirect 

1079 response to the previous request. 

1080 

1081 :meta private: 

1082 """ 

1083 scheme, netloc, path, qs, anchor = urlsplit(response.location) 

1084 builder = EnvironBuilder.from_environ( 

1085 response.request.environ, path=path, query_string=qs 

1086 ) 

1087 

1088 to_name_parts = netloc.split(":", 1)[0].split(".") 

1089 from_name_parts = builder.server_name.split(".") 

1090 

1091 if to_name_parts != [""]: 

1092 # The new location has a host, use it for the base URL. 

1093 builder.url_scheme = scheme 

1094 builder.host = netloc 

1095 else: 

1096 # A local redirect with autocorrect_location_header=False 

1097 # doesn't have a host, so use the request's host. 

1098 to_name_parts = from_name_parts 

1099 

1100 # Explain why a redirect to a different server name won't be followed. 

1101 if to_name_parts != from_name_parts: 

1102 if to_name_parts[-len(from_name_parts) :] == from_name_parts: 

1103 if not self.allow_subdomain_redirects: 

1104 raise RuntimeError("Following subdomain redirects is not enabled.") 

1105 else: 

1106 raise RuntimeError("Following external redirects is not supported.") 

1107 

1108 path_parts = path.split("/") 

1109 root_parts = builder.script_root.split("/") 

1110 

1111 if path_parts[: len(root_parts)] == root_parts: 

1112 # Strip the script root from the path. 

1113 builder.path = path[len(builder.script_root) :] 

1114 else: 

1115 # The new location is not under the script root, so use the 

1116 # whole path and clear the previous root. 

1117 builder.path = path 

1118 builder.script_root = "" 

1119 

1120 # Only 307 and 308 preserve all of the original request. 

1121 if response.status_code not in {307, 308}: 

1122 # HEAD is preserved, everything else becomes GET. 

1123 if builder.method != "HEAD": 

1124 builder.method = "GET" 

1125 

1126 # Clear the body and the headers that describe it. 

1127 

1128 if builder.input_stream is not None: 

1129 builder.input_stream.close() 

1130 builder.input_stream = None 

1131 

1132 builder.content_type = None 

1133 builder.content_length = None 

1134 builder.headers.pop("Transfer-Encoding", None) 

1135 

1136 return self.open(builder, buffered=buffered) 

1137 

1138 def open( 

1139 self, 

1140 *args: t.Any, 

1141 buffered: bool = False, 

1142 follow_redirects: bool = False, 

1143 **kwargs: t.Any, 

1144 ) -> TestResponse: 

1145 """Generate an environ dict from the given arguments, make a 

1146 request to the application using it, and return the response. 

1147 

1148 :param args: Passed to :class:`EnvironBuilder` to create the 

1149 environ for the request. If a single arg is passed, it can 

1150 be an existing :class:`EnvironBuilder` or an environ dict. 

1151 :param buffered: Convert the iterator returned by the app into 

1152 a list. If the iterator has a ``close()`` method, it is 

1153 called automatically. 

1154 :param follow_redirects: Make additional requests to follow HTTP 

1155 redirects until a non-redirect status is returned. 

1156 :attr:`TestResponse.history` lists the intermediate 

1157 responses. 

1158 

1159 .. versionchanged:: 2.1 

1160 Removed the ``as_tuple`` parameter. 

1161 

1162 .. versionchanged:: 2.0 

1163 The request input stream is closed when calling 

1164 ``response.close()``. Input streams for redirects are 

1165 automatically closed. 

1166 

1167 .. versionchanged:: 0.5 

1168 If a dict is provided as file in the dict for the ``data`` 

1169 parameter the content type has to be called ``content_type`` 

1170 instead of ``mimetype``. This change was made for 

1171 consistency with :class:`werkzeug.FileWrapper`. 

1172 

1173 .. versionchanged:: 0.5 

1174 Added the ``follow_redirects`` parameter. 

1175 """ 

1176 request: Request | None = None 

1177 

1178 if not kwargs and len(args) == 1: 

1179 arg = args[0] 

1180 

1181 if isinstance(arg, EnvironBuilder): 

1182 request = arg.get_request() 

1183 elif isinstance(arg, dict): 

1184 request = EnvironBuilder.from_environ(arg).get_request() 

1185 elif isinstance(arg, Request): 

1186 request = arg 

1187 

1188 if request is None: 

1189 builder = EnvironBuilder(*args, **kwargs) 

1190 

1191 try: 

1192 request = builder.get_request() 

1193 finally: 

1194 builder.close() 

1195 

1196 response = self.run_wsgi_app(request.environ, buffered=buffered) 

1197 response = self.response_wrapper(*response, request=request) 

1198 

1199 redirects = set() 

1200 history: list[TestResponse] = [] 

1201 

1202 if not follow_redirects: 

1203 return response 

1204 

1205 while response.status_code in { 

1206 301, 

1207 302, 

1208 303, 

1209 305, 

1210 307, 

1211 308, 

1212 }: 

1213 # Exhaust intermediate response bodies to ensure middleware 

1214 # that returns an iterator runs any cleanup code. 

1215 if not buffered: 

1216 response.make_sequence() 

1217 response.close() 

1218 

1219 new_redirect_entry = (response.location, response.status_code) 

1220 

1221 if new_redirect_entry in redirects: 

1222 raise ClientRedirectError( 

1223 f"Loop detected: A {response.status_code} redirect" 

1224 f" to {response.location} was already made." 

1225 ) 

1226 

1227 redirects.add(new_redirect_entry) 

1228 response.history = tuple(history) 

1229 history.append(response) 

1230 response = self.resolve_redirect(response, buffered=buffered) 

1231 else: 

1232 # This is the final request after redirects. 

1233 response.history = tuple(history) 

1234 # Close the input stream when closing the response, in case 

1235 # the input is an open temporary file. 

1236 response.call_on_close(request.input_stream.close) 

1237 return response 

1238 

1239 def get(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1240 """Call :meth:`open` with ``method`` set to ``GET``.""" 

1241 kw["method"] = "GET" 

1242 return self.open(*args, **kw) 

1243 

1244 def post(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1245 """Call :meth:`open` with ``method`` set to ``POST``.""" 

1246 kw["method"] = "POST" 

1247 return self.open(*args, **kw) 

1248 

1249 def put(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1250 """Call :meth:`open` with ``method`` set to ``PUT``.""" 

1251 kw["method"] = "PUT" 

1252 return self.open(*args, **kw) 

1253 

1254 def delete(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1255 """Call :meth:`open` with ``method`` set to ``DELETE``.""" 

1256 kw["method"] = "DELETE" 

1257 return self.open(*args, **kw) 

1258 

1259 def patch(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1260 """Call :meth:`open` with ``method`` set to ``PATCH``.""" 

1261 kw["method"] = "PATCH" 

1262 return self.open(*args, **kw) 

1263 

1264 def options(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1265 """Call :meth:`open` with ``method`` set to ``OPTIONS``.""" 

1266 kw["method"] = "OPTIONS" 

1267 return self.open(*args, **kw) 

1268 

1269 def head(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1270 """Call :meth:`open` with ``method`` set to ``HEAD``.""" 

1271 kw["method"] = "HEAD" 

1272 return self.open(*args, **kw) 

1273 

1274 def trace(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1275 """Call :meth:`open` with ``method`` set to ``TRACE``.""" 

1276 kw["method"] = "TRACE" 

1277 return self.open(*args, **kw) 

1278 

1279 def __repr__(self) -> str: 

1280 return f"<{type(self).__name__} {self.application!r}>" 

1281 

1282 

1283def create_environ(*args: t.Any, **kwargs: t.Any) -> WSGIEnvironment: 

1284 """Create a new WSGI environ dict based on the values passed. The first 

1285 parameter should be the path of the request which defaults to '/'. The 

1286 second one can either be an absolute path (in that case the host is 

1287 localhost:80) or a full path to the request with scheme, netloc port and 

1288 the path to the script. 

1289 

1290 This accepts the same arguments as the :class:`EnvironBuilder` 

1291 constructor. 

1292 

1293 .. versionchanged:: 0.5 

1294 This function is now a thin wrapper over :class:`EnvironBuilder` which 

1295 was added in 0.5. The `headers`, `environ_base`, `environ_overrides` 

1296 and `charset` parameters were added. 

1297 """ 

1298 builder = EnvironBuilder(*args, **kwargs) 

1299 

1300 try: 

1301 return builder.get_environ() 

1302 finally: 

1303 builder.close() 

1304 

1305 

1306def run_wsgi_app( 

1307 app: WSGIApplication, environ: WSGIEnvironment, buffered: bool = False 

1308) -> tuple[t.Iterable[bytes], str, Headers]: 

1309 """Return a tuple in the form (app_iter, status, headers) of the 

1310 application output. This works best if you pass it an application that 

1311 returns an iterator all the time. 

1312 

1313 Sometimes applications may use the `write()` callable returned 

1314 by the `start_response` function. This tries to resolve such edge 

1315 cases automatically. But if you don't get the expected output you 

1316 should set `buffered` to `True` which enforces buffering. 

1317 

1318 If passed an invalid WSGI application the behavior of this function is 

1319 undefined. Never pass non-conforming WSGI applications to this function. 

1320 

1321 :param app: the application to execute. 

1322 :param buffered: set to `True` to enforce buffering. 

1323 :return: tuple in the form ``(app_iter, status, headers)`` 

1324 """ 

1325 # Copy environ to ensure any mutations by the app (ProxyFix, for 

1326 # example) don't affect subsequent requests (such as redirects). 

1327 environ = _get_environ(environ).copy() 

1328 status: str 

1329 response: tuple[str, list[tuple[str, str]]] | None = None 

1330 buffer: list[bytes] = [] 

1331 

1332 def start_response(status, headers, exc_info=None): # type: ignore 

1333 nonlocal response 

1334 

1335 if exc_info: 

1336 try: 

1337 raise exc_info[1].with_traceback(exc_info[2]) 

1338 finally: 

1339 exc_info = None 

1340 

1341 response = (status, headers) 

1342 return buffer.append 

1343 

1344 app_rv = app(environ, start_response) 

1345 close_func = getattr(app_rv, "close", None) 

1346 app_iter: t.Iterable[bytes] = iter(app_rv) 

1347 

1348 # when buffering we emit the close call early and convert the 

1349 # application iterator into a regular list 

1350 if buffered: 

1351 try: 

1352 app_iter = list(app_iter) 

1353 finally: 

1354 if close_func is not None: 

1355 close_func() 

1356 

1357 # otherwise we iterate the application iter until we have a response, chain 

1358 # the already received data with the already collected data and wrap it in 

1359 # a new `ClosingIterator` if we need to restore a `close` callable from the 

1360 # original return value. 

1361 else: 

1362 for item in app_iter: 

1363 buffer.append(item) 

1364 

1365 if response is not None: 

1366 break 

1367 

1368 if buffer: 

1369 app_iter = chain(buffer, app_iter) 

1370 

1371 if close_func is not None and app_iter is not app_rv: 

1372 app_iter = ClosingIterator(app_iter, close_func) 

1373 

1374 status, headers = response # type: ignore 

1375 return app_iter, status, Headers(headers) 

1376 

1377 

1378class TestResponse(Response): 

1379 """:class:`~werkzeug.wrappers.Response` subclass that provides extra 

1380 information about requests made with the test :class:`Client`. 

1381 

1382 Test client requests will always return an instance of this class. 

1383 If a custom response class is passed to the client, it is 

1384 subclassed along with this to support test information. 

1385 

1386 If the test request included large files, or if the application is 

1387 serving a file, call :meth:`close` to close any open files and 

1388 prevent Python showing a ``ResourceWarning``. 

1389 

1390 .. versionchanged:: 2.2 

1391 Set the ``default_mimetype`` to None to prevent a mimetype being 

1392 assumed if missing. 

1393 

1394 .. versionchanged:: 2.1 

1395 Response instances cannot be treated as tuples. 

1396 

1397 .. versionadded:: 2.0 

1398 Test client methods always return instances of this class. 

1399 """ 

1400 

1401 default_mimetype = None 

1402 # Don't assume a mimetype, instead use whatever the response provides 

1403 

1404 request: Request 

1405 """A request object with the environ used to make the request that 

1406 resulted in this response. 

1407 """ 

1408 

1409 history: tuple[TestResponse, ...] 

1410 """A list of intermediate responses. Populated when the test request 

1411 is made with ``follow_redirects`` enabled. 

1412 """ 

1413 

1414 # Tell Pytest to ignore this, it's not a test class. 

1415 __test__ = False 

1416 

1417 def __init__( 

1418 self, 

1419 response: t.Iterable[bytes], 

1420 status: str, 

1421 headers: Headers, 

1422 request: Request, 

1423 history: tuple[TestResponse] = (), # type: ignore 

1424 **kwargs: t.Any, 

1425 ) -> None: 

1426 super().__init__(response, status, headers, **kwargs) 

1427 self.request = request 

1428 self.history = history 

1429 self._compat_tuple = response, status, headers 

1430 

1431 @cached_property 

1432 def text(self) -> str: 

1433 """The response data as text. A shortcut for 

1434 ``response.get_data(as_text=True)``. 

1435 

1436 .. versionadded:: 2.1 

1437 """ 

1438 return self.get_data(as_text=True) 

1439 

1440 

1441@dataclasses.dataclass 

1442class Cookie: 

1443 """A cookie key, value, and parameters. 

1444 

1445 The class itself is not a public API. Its attributes are documented for inspection 

1446 with :meth:`.Client.get_cookie` only. 

1447 

1448 .. versionadded:: 2.3 

1449 """ 

1450 

1451 key: str 

1452 """The cookie key, encoded as a client would see it.""" 

1453 

1454 value: str 

1455 """The cookie key, encoded as a client would see it.""" 

1456 

1457 decoded_key: str 

1458 """The cookie key, decoded as the application would set and see it.""" 

1459 

1460 decoded_value: str 

1461 """The cookie value, decoded as the application would set and see it.""" 

1462 

1463 expires: datetime | None 

1464 """The time at which the cookie is no longer valid.""" 

1465 

1466 max_age: int | None 

1467 """The number of seconds from when the cookie was set at which it is 

1468 no longer valid. 

1469 """ 

1470 

1471 domain: str 

1472 """The domain that the cookie was set for, or the request domain if not set.""" 

1473 

1474 origin_only: bool 

1475 """Whether the cookie will be sent for exact domain matches only. This is ``True`` 

1476 if the ``Domain`` parameter was not present. 

1477 """ 

1478 

1479 path: str 

1480 """The path that the cookie was set for.""" 

1481 

1482 secure: bool | None 

1483 """The ``Secure`` parameter.""" 

1484 

1485 http_only: bool | None 

1486 """The ``HttpOnly`` parameter.""" 

1487 

1488 same_site: str | None 

1489 """The ``SameSite`` parameter.""" 

1490 

1491 def _matches_request(self, server_name: str, path: str) -> bool: 

1492 return ( 

1493 server_name == self.domain 

1494 or ( 

1495 not self.origin_only 

1496 and server_name.endswith(self.domain) 

1497 and server_name[: -len(self.domain)].endswith(".") 

1498 ) 

1499 ) and ( 

1500 path == self.path 

1501 or ( 

1502 path.startswith(self.path) 

1503 and path[len(self.path) - self.path.endswith("/") :].startswith("/") 

1504 ) 

1505 ) 

1506 

1507 def _to_request_header(self) -> str: 

1508 return f"{self.key}={self.value}" 

1509 

1510 @classmethod 

1511 def _from_response_header(cls, server_name: str, path: str, header: str) -> te.Self: 

1512 header, _, parameters_str = header.partition(";") 

1513 key, _, value = header.partition("=") 

1514 decoded_key, decoded_value = next(parse_cookie(header).items()) 

1515 params = {} 

1516 

1517 for item in parameters_str.split(";"): 

1518 k, sep, v = item.partition("=") 

1519 params[k.strip().lower()] = v.strip() if sep else None 

1520 

1521 return cls( 

1522 key=key.strip(), 

1523 value=value.strip(), 

1524 decoded_key=decoded_key, 

1525 decoded_value=decoded_value, 

1526 expires=parse_date(params.get("expires")), 

1527 max_age=int(params["max-age"] or 0) if "max-age" in params else None, 

1528 domain=params.get("domain") or server_name, 

1529 origin_only="domain" not in params, 

1530 path=params.get("path") or path.rpartition("/")[0] or "/", 

1531 secure="secure" in params, 

1532 http_only="httponly" in params, 

1533 same_site=params.get("samesite"), 

1534 ) 

1535 

1536 @property 

1537 def _storage_key(self) -> tuple[str, str, str]: 

1538 return self.domain, self.path, self.decoded_key 

1539 

1540 @property 

1541 def _should_delete(self) -> bool: 

1542 return self.max_age == 0 or ( 

1543 self.expires is not None and self.expires.timestamp() == 0 

1544 )