Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/test.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

597 statements  

1from __future__ import annotations 

2 

3import dataclasses 

4import json 

5import mimetypes 

6import sys 

7import typing as t 

8from collections import defaultdict 

9from datetime import datetime 

10from io import BytesIO 

11from itertools import chain 

12from random import random 

13from tempfile import TemporaryFile 

14from time import time 

15from types import TracebackType 

16from urllib.parse import unquote 

17from urllib.parse import urlsplit 

18from urllib.parse import urlunsplit 

19 

20from ._internal import _get_environ 

21from ._internal import _wsgi_decoding_dance 

22from ._internal import _wsgi_encoding_dance 

23from .datastructures import Authorization 

24from .datastructures import CallbackDict 

25from .datastructures import CombinedMultiDict 

26from .datastructures import EnvironHeaders 

27from .datastructures import FileMultiDict 

28from .datastructures import Headers 

29from .datastructures import MultiDict 

30from .http import dump_cookie 

31from .http import dump_options_header 

32from .http import parse_cookie 

33from .http import parse_date 

34from .http import parse_options_header 

35from .sansio.multipart import Data 

36from .sansio.multipart import Epilogue 

37from .sansio.multipart import Field 

38from .sansio.multipart import File 

39from .sansio.multipart import MultipartEncoder 

40from .sansio.multipart import Preamble 

41from .urls import _urlencode 

42from .urls import iri_to_uri 

43from .utils import cached_property 

44from .utils import get_content_type 

45from .wrappers.request import Request 

46from .wrappers.response import Response 

47from .wsgi import ClosingIterator 

48from .wsgi import get_current_url 

49 

50if t.TYPE_CHECKING: 

51 import typing_extensions as te 

52 from _typeshed.wsgi import WSGIApplication 

53 from _typeshed.wsgi import WSGIEnvironment 

54 

55 

56def stream_encode_multipart( 

57 data: t.Mapping[str, t.Any], 

58 use_tempfile: bool = True, 

59 threshold: int = 1024 * 500, 

60 boundary: str | None = None, 

61) -> tuple[t.IO[bytes], int, str]: 

62 """Encode a dict of values (either strings or file descriptors or 

63 :class:`FileStorage` objects.) into a multipart encoded string stored 

64 in a file descriptor. 

65 

66 .. versionchanged:: 3.0 

67 The ``charset`` parameter was removed. 

68 """ 

69 if boundary is None: 

70 boundary = f"---------------WerkzeugFormPart_{time()}{random()}" 

71 

72 stream: t.IO[bytes] = BytesIO() 

73 total_length = 0 

74 on_disk = False 

75 write_binary: t.Callable[[bytes], int] 

76 

77 if use_tempfile: 

78 

79 def write_binary(s: bytes) -> int: 

80 nonlocal stream, total_length, on_disk 

81 

82 if on_disk: 

83 return stream.write(s) 

84 else: 

85 length = len(s) 

86 

87 if length + total_length <= threshold: 

88 stream.write(s) 

89 else: 

90 new_stream = t.cast(t.IO[bytes], TemporaryFile("wb+")) 

91 new_stream.write(stream.getvalue()) # type: ignore 

92 new_stream.write(s) 

93 stream = new_stream 

94 on_disk = True 

95 

96 total_length += length 

97 return length 

98 

99 else: 

100 write_binary = stream.write 

101 

102 encoder = MultipartEncoder(boundary.encode()) 

103 write_binary(encoder.send_event(Preamble(data=b""))) 

104 for key, value in _iter_data(data): 

105 reader = getattr(value, "read", None) 

106 if reader is not None: 

107 filename = getattr(value, "filename", getattr(value, "name", None)) 

108 content_type = getattr(value, "content_type", None) 

109 if content_type is None: 

110 content_type = ( 

111 filename 

112 and mimetypes.guess_type(filename)[0] 

113 or "application/octet-stream" 

114 ) 

115 headers = value.headers 

116 headers.update([("Content-Type", content_type)]) 

117 if filename is None: 

118 write_binary(encoder.send_event(Field(name=key, headers=headers))) 

119 else: 

120 write_binary( 

121 encoder.send_event( 

122 File(name=key, filename=filename, headers=headers) 

123 ) 

124 ) 

125 while True: 

126 chunk = reader(16384) 

127 

128 if not chunk: 

129 write_binary(encoder.send_event(Data(data=chunk, more_data=False))) 

130 break 

131 

132 write_binary(encoder.send_event(Data(data=chunk, more_data=True))) 

133 else: 

134 if not isinstance(value, str): 

135 value = str(value) 

136 write_binary(encoder.send_event(Field(name=key, headers=Headers()))) 

137 write_binary(encoder.send_event(Data(data=value.encode(), more_data=False))) 

138 

139 write_binary(encoder.send_event(Epilogue(data=b""))) 

140 

141 length = stream.tell() 

142 stream.seek(0) 

143 return stream, length, boundary 

144 

145 

146def encode_multipart( 

147 values: t.Mapping[str, t.Any], boundary: str | None = None 

148) -> tuple[str, bytes]: 

149 """Like `stream_encode_multipart` but returns a tuple in the form 

150 (``boundary``, ``data``) where data is bytes. 

151 

152 .. versionchanged:: 3.0 

153 The ``charset`` parameter was removed. 

154 """ 

155 stream, length, boundary = stream_encode_multipart( 

156 values, use_tempfile=False, boundary=boundary 

157 ) 

158 return boundary, stream.read() 

159 

160 

161def _iter_data(data: t.Mapping[str, t.Any]) -> t.Iterator[tuple[str, t.Any]]: 

162 """Iterate over a mapping that might have a list of values, yielding 

163 all key, value pairs. Almost like iter_multi_items but only allows 

164 lists, not tuples, of values so tuples can be used for files. 

165 """ 

166 if isinstance(data, MultiDict): 

167 yield from data.items(multi=True) 

168 else: 

169 for key, value in data.items(): 

170 if isinstance(value, list): 

171 for v in value: 

172 yield key, v 

173 else: 

174 yield key, value 

175 

176 

177class EnvironBuilder: 

178 """This class can be used to conveniently create a WSGI environment 

179 for testing purposes. It can be used to quickly create WSGI environments 

180 or request objects from arbitrary data. 

181 

182 The signature of this class is also used in some other places as of 

183 Werkzeug 0.5 (:func:`create_environ`, :meth:`Response.from_values`, 

184 :meth:`Client.open`). Because of this most of the functionality is 

185 available through the constructor alone. 

186 

187 :param path: the path of the request. In the WSGI environment this will 

188 end up as `PATH_INFO`. If the `query_string` is not defined 

189 and there is a question mark in the `path` everything after 

190 it is used as query string. 

191 :param base_url: the base URL is a URL that is used to extract the WSGI 

192 URL scheme, host (server name + server port) and the 

193 script root (`SCRIPT_NAME`). 

194 :param query_string: A :class:`dict` or :class:`.MultiDict` to encode as the 

195 query string of the URL, which sets :attr:`args`. Or a string, which 

196 sets :attr:`query_string`, in which case :attr:`args` cannot be used. 

197 :param method: the HTTP method to use, defaults to `GET`. 

198 :param content_type: The content type for the request. As of 0.5 you 

199 don't have to provide this when specifying files 

200 and form data via `data`. 

201 :param content_length: The content length for the request. You don't 

202 have to specify this when providing data via 

203 `data`. 

204 :param errors_stream: an optional error stream that is used for 

205 `wsgi.errors`. Defaults to :data:`stderr`. 

206 :param multithread: controls `wsgi.multithread`. Defaults to `False`. 

207 :param multiprocess: controls `wsgi.multiprocess`. Defaults to `False`. 

208 :param run_once: controls `wsgi.run_once`. Defaults to `False`. 

209 :param headers: an optional list or :class:`Headers` object of headers. 

210 :param data: A dict of form and file data to encode as the body of the 

211 request; file values can be an IO object, ``(stream, filename)``, 

212 ``(stream, filename, content type)``, or :class:`.FileStorage`. 

213 Alternatively, pass raw bytes to set as :attr:`input_stream`, or an IO 

214 object to read and set. ``json`` can be used to set JSON data instead. 

215 ``content_length`` is set automatically. 

216 :param json: An object to be serialized and assigned to ``data``. 

217 Defaults the content type to ``"application/json"``. 

218 Serialized with the function assigned to :attr:`json_dumps`. 

219 :param environ_base: an optional dict of environment defaults. 

220 :param environ_overrides: an optional dict of environment overrides. 

221 :param auth: An authorization object to use for the 

222 ``Authorization`` header value. A ``(username, password)`` tuple 

223 is a shortcut for ``Basic`` authorization. 

224 :param input_stream: An IO object to pass through as the body of the 

225 request, without reading, which simulates a streaming request. The 

226 stream is not closed when calling :meth:`.close`, as it must remain open 

227 to be read in the application. 

228 

229 .. versionchanged:: 3.2 

230 Can be used as a ``with`` context manager to automatically close 

231 

232 .. versionchanged:: 3.0 

233 The ``charset`` parameter was removed. 

234 

235 .. versionchanged:: 2.1 

236 ``CONTENT_TYPE`` and ``CONTENT_LENGTH`` are not duplicated as 

237 header keys in the environ. 

238 

239 .. versionchanged:: 2.0 

240 ``REQUEST_URI`` and ``RAW_URI`` is the full raw URI including 

241 the query string, not only the path. 

242 

243 .. versionchanged:: 2.0 

244 The default :attr:`request_class` is ``Request`` instead of 

245 ``BaseRequest``. 

246 

247 .. versionadded:: 2.0 

248 Added the ``auth`` parameter. 

249 

250 .. versionadded:: 0.15 

251 The ``json`` param and :meth:`json_dumps` method. 

252 

253 .. versionadded:: 0.15 

254 The environ has keys ``REQUEST_URI`` and ``RAW_URI`` containing 

255 the path before percent-decoding. This is not part of the WSGI 

256 PEP, but many WSGI servers include it. 

257 

258 .. versionchanged:: 0.6 

259 ``path`` and ``base_url`` can now be unicode strings that are 

260 encoded with :func:`iri_to_uri`. 

261 """ 

262 

263 #: the server protocol to use. defaults to HTTP/1.1 

264 server_protocol = "HTTP/1.1" 

265 

266 #: the wsgi version to use. defaults to (1, 0) 

267 wsgi_version = (1, 0) 

268 

269 #: The default request class used by :meth:`get_request`. 

270 request_class = Request 

271 

272 #: The serialization function used when ``json`` is passed. 

273 json_dumps = staticmethod(json.dumps) 

274 

275 _args: MultiDict[str, str] = MultiDict() 

276 _query_string: str | None = None 

277 _form: MultiDict[str, str] = MultiDict() 

278 _files: FileMultiDict = FileMultiDict() 

279 _input_stream: t.IO[bytes] | None = None 

280 

281 def __init__( 

282 self, 

283 path: str = "/", 

284 base_url: str | None = None, 

285 query_string: t.Mapping[str, str] | str | None = None, 

286 method: str = "GET", 

287 input_stream: t.IO[bytes] | None = None, 

288 content_type: str | None = None, 

289 content_length: int | None = None, 

290 errors_stream: t.IO[str] | None = None, 

291 multithread: bool = False, 

292 multiprocess: bool = False, 

293 run_once: bool = False, 

294 headers: Headers | t.Iterable[tuple[str, str]] | None = None, 

295 data: t.IO[bytes] | str | bytes | t.Mapping[str, t.Any] | None = None, 

296 environ_base: t.Mapping[str, t.Any] | None = None, 

297 environ_overrides: t.Mapping[str, t.Any] | None = None, 

298 mimetype: str | None = None, 

299 json: t.Mapping[str, t.Any] | None = None, 

300 auth: Authorization | tuple[str, str] | None = None, 

301 ) -> None: 

302 if query_string is not None and "?" in path: 

303 raise ValueError("Query string is defined in the path and as an argument") 

304 request_uri = urlsplit(path) 

305 if query_string is None and "?" in path: 

306 query_string = request_uri.query 

307 

308 self.path = iri_to_uri(request_uri.path) 

309 self.request_uri = path 

310 if base_url is not None: 

311 base_url = iri_to_uri(base_url) 

312 self.base_url = base_url 

313 if isinstance(query_string, str): 

314 self.query_string = query_string 

315 else: 

316 if query_string is None: 

317 query_string = MultiDict() 

318 elif not isinstance(query_string, MultiDict): 

319 query_string = MultiDict(query_string) 

320 self.args = query_string 

321 self.method = method 

322 if headers is None: 

323 headers = Headers() 

324 elif not isinstance(headers, Headers): 

325 headers = Headers(headers) 

326 self.headers = headers 

327 if content_type is not None: 

328 self.content_type = content_type 

329 if errors_stream is None: 

330 errors_stream = sys.stderr 

331 self.errors_stream = errors_stream 

332 self.multithread = multithread 

333 self.multiprocess = multiprocess 

334 self.run_once = run_once 

335 self.environ_base = environ_base 

336 self.environ_overrides = environ_overrides 

337 self.input_stream = input_stream 

338 self.content_length = content_length 

339 

340 if auth is not None: 

341 if isinstance(auth, tuple): 

342 auth = Authorization( 

343 "basic", {"username": auth[0], "password": auth[1]} 

344 ) 

345 

346 self.headers.set("Authorization", auth.to_header()) 

347 

348 if json is not None: 

349 if data is not None: 

350 raise TypeError("can't provide both json and data") 

351 

352 data = self.json_dumps(json) 

353 

354 if self.content_type is None: 

355 self.content_type = "application/json" 

356 

357 if data: 

358 if input_stream is not None: 

359 raise TypeError("can't provide input stream and data") 

360 if hasattr(data, "read"): 

361 data = data.read() 

362 if isinstance(data, str): 

363 data = data.encode() 

364 if isinstance(data, bytes): 

365 self.input_stream = BytesIO(data) 

366 if self.content_length is None: 

367 self.content_length = len(data) 

368 else: 

369 for key, value in _iter_data(data): 

370 if isinstance(value, (tuple, dict)) or hasattr(value, "read"): 

371 self._add_file_from_data(key, value) 

372 else: 

373 self.form.setlistdefault(key).append(value) 

374 

375 if mimetype is not None: 

376 self.mimetype = mimetype 

377 

378 @classmethod 

379 def from_environ(cls, environ: WSGIEnvironment, **kwargs: t.Any) -> te.Self: 

380 """Turn an environ dict back into a builder. Any extra kwargs 

381 override the args extracted from the environ. 

382 

383 .. versionchanged:: 2.0 

384 Path and query values are passed through the WSGI decoding 

385 dance to avoid double encoding. 

386 

387 .. versionadded:: 0.15 

388 """ 

389 headers = Headers(EnvironHeaders(environ)) 

390 out = { 

391 "path": _wsgi_decoding_dance(environ["PATH_INFO"]), 

392 "base_url": cls._make_base_url( 

393 environ["wsgi.url_scheme"], 

394 headers.pop("Host"), 

395 _wsgi_decoding_dance(environ["SCRIPT_NAME"]), 

396 ), 

397 "query_string": _wsgi_decoding_dance(environ["QUERY_STRING"]), 

398 "method": environ["REQUEST_METHOD"], 

399 "input_stream": environ["wsgi.input"], 

400 "content_type": headers.pop("Content-Type", None), 

401 "content_length": headers.pop("Content-Length", None), 

402 "errors_stream": environ["wsgi.errors"], 

403 "multithread": environ["wsgi.multithread"], 

404 "multiprocess": environ["wsgi.multiprocess"], 

405 "run_once": environ["wsgi.run_once"], 

406 "headers": headers, 

407 } 

408 out.update(kwargs) 

409 return cls(**out) 

410 

411 def _add_file_from_data( 

412 self, 

413 key: str, 

414 value: (t.IO[bytes] | tuple[t.IO[bytes], str] | tuple[t.IO[bytes], str, str]), 

415 ) -> None: 

416 """Called in the EnvironBuilder to add files from the data dict.""" 

417 if isinstance(value, tuple): 

418 self.files.add_file(key, *value) 

419 else: 

420 self.files.add_file(key, value) 

421 

422 @staticmethod 

423 def _make_base_url(scheme: str, host: str, script_root: str) -> str: 

424 return urlunsplit((scheme, host, script_root, "", "")).rstrip("/") + "/" 

425 

426 @property 

427 def base_url(self) -> str: 

428 """The base URL is used to extract the URL scheme, host name, 

429 port, and root path. 

430 """ 

431 return self._make_base_url(self.url_scheme, self.host, self.script_root) 

432 

433 @base_url.setter 

434 def base_url(self, value: str | None) -> None: 

435 if value is None: 

436 scheme = "http" 

437 netloc = "localhost" 

438 script_root = "" 

439 else: 

440 scheme, netloc, script_root, qs, anchor = urlsplit(value) 

441 if qs or anchor: 

442 raise ValueError("base url must not contain a query string or fragment") 

443 self.script_root = script_root.rstrip("/") 

444 self.host = netloc 

445 self.url_scheme = scheme 

446 

447 @property 

448 def content_type(self) -> str | None: 

449 """The content type for the request. Reflected from and to 

450 the :attr:`headers`. Do not set if you set :attr:`files` or 

451 :attr:`form` for auto detection. 

452 """ 

453 ct = self.headers.get("Content-Type") 

454 if ct is None and not self._input_stream: 

455 if self._files: 

456 return "multipart/form-data" 

457 if self._form: 

458 return "application/x-www-form-urlencoded" 

459 return None 

460 return ct 

461 

462 @content_type.setter 

463 def content_type(self, value: str | None) -> None: 

464 if value is None: 

465 self.headers.pop("Content-Type", None) 

466 else: 

467 self.headers["Content-Type"] = value 

468 

469 @property 

470 def mimetype(self) -> str | None: 

471 """The mimetype (content type without charset etc.) 

472 

473 .. versionadded:: 0.14 

474 """ 

475 ct = self.content_type 

476 return ct.split(";")[0].strip() if ct else None 

477 

478 @mimetype.setter 

479 def mimetype(self, value: str) -> None: 

480 self.content_type = get_content_type(value, "utf-8") 

481 

482 @property 

483 def mimetype_params(self) -> t.Mapping[str, str]: 

484 """The mimetype parameters as dict. For example if the 

485 content type is ``text/html; charset=utf-8`` the params would be 

486 ``{'charset': 'utf-8'}``. 

487 

488 .. versionadded:: 0.14 

489 """ 

490 

491 def on_update(d: CallbackDict[str, str]) -> None: 

492 self.headers["Content-Type"] = dump_options_header(self.mimetype, d) 

493 

494 d = parse_options_header(self.headers.get("Content-Type", ""))[1] 

495 return CallbackDict(d, on_update) 

496 

497 @property 

498 def content_length(self) -> int | None: 

499 """The content length as integer. Reflected from and to the 

500 :attr:`headers`. Do not set if you set :attr:`files` or 

501 :attr:`form` for auto detection. 

502 """ 

503 return self.headers.get("Content-Length", type=int) 

504 

505 @content_length.setter 

506 def content_length(self, value: int | None) -> None: 

507 if value is None: 

508 self.headers.pop("Content-Length", None) 

509 else: 

510 self.headers["Content-Length"] = str(value) 

511 

512 @property 

513 def form(self) -> MultiDict[str, str]: 

514 """Form data text values. File values are stored in :attr:`files`. 

515 

516 If any values are set and no files are set, the request body will be 

517 encoded and the content type set to ``application/x-www-form-urlencoded``. 

518 

519 Set when the constructor ``data`` parameter is a dict or multidict. 

520 

521 Cannot be accessed if :attr:`input_stream` is set. Setting this will 

522 unset :attr:`input_stream`. 

523 """ 

524 if self.input_stream is not None: 

525 raise AttributeError("Not available when 'input_stream' is set.") 

526 

527 return self._form 

528 

529 @form.setter 

530 def form(self, value: MultiDict[str, str]) -> None: 

531 self._input_stream = None 

532 self._form = value 

533 

534 @property 

535 def files(self) -> FileMultiDict: 

536 """Form data file values. Text values are stored in :attr:`form`. 

537 

538 If any values are set, the request body will be encoded and the content 

539 type set to ``multipart/form-data``. 

540 

541 Set when the constructor ``data`` parameter is a dict or multidict. 

542 

543 The :meth:`.FileMultiDict.add_file` method provides a convenient way to 

544 add more files without needing to construct :class:`.FileStorage` 

545 objects. 

546 

547 The file streams will be read when encoding the data. All streams will 

548 be closed when the builder's :meth:`.close` method is called. 

549 

550 Cannot be accessed if :attr:`input_stream` is set. Setting this will 

551 unset :attr:`input_stream`. 

552 

553 Setting this will _not_ close files in the previous dict, call 

554 :meth:`.FileMultiDict.close` first if that's needed. 

555 """ 

556 if self.input_stream is not None: 

557 raise AttributeError("Not available when 'input_stream' is set.") 

558 

559 return self._files 

560 

561 @files.setter 

562 def files(self, value: FileMultiDict) -> None: 

563 self._input_stream = None 

564 self._files = value 

565 

566 @property 

567 def input_stream(self) -> t.IO[bytes] | None: 

568 """A binary IO object to pass through as the body of the request, 

569 without reading, which simulates a streaming request. 

570 

571 If this is set, :attr:`form` and :attr:`files` cannot be accessed. 

572 If those are set, this will be ``None``. Setting this will close any 

573 files and clear those. 

574 

575 The stream is not closed when calling the builder's :meth:`close` 

576 method, as it must remain open to be read in the application. 

577 

578 .. versionchanged:: 3.2 

579 Any values in :attr:`files` are closed first when setting this. 

580 """ 

581 return self._input_stream 

582 

583 @input_stream.setter 

584 def input_stream(self, value: t.IO[bytes] | None) -> None: 

585 self._form.clear() 

586 self._files.clear() 

587 self._input_stream = value 

588 

589 @property 

590 def query_string(self) -> str: 

591 """The URL query string. 

592 

593 If this is set to a string, :attr:`args` cannot be accessed. If 

594 :attr:`args` is set, this will be the encoded value of that. If neither 

595 is set, this is the empty string. 

596 """ 

597 if self._query_string is None: 

598 if self._args is not None: 

599 return _urlencode(self._args) 

600 

601 return "" 

602 

603 return self._query_string 

604 

605 @query_string.setter 

606 def query_string(self, value: str | None) -> None: 

607 self._args.clear() 

608 self._query_string = value 

609 

610 @property 

611 def args(self) -> MultiDict[str, str]: 

612 """The URL query string as a :class:`MultiDict`. 

613 

614 Set when the constructor ``query_string`` parameter is a dict or 

615 multidict. 

616 

617 Setting this will unset :attr:`query_string`. 

618 """ 

619 if self._query_string is not None: 

620 raise AttributeError("Not available when 'query_string' is set.") 

621 

622 return self._args 

623 

624 @args.setter 

625 def args(self, value: MultiDict[str, str]) -> None: 

626 self._query_string = None 

627 self._args = value 

628 

629 @property 

630 def server_name(self) -> str: 

631 """The server name (read-only, use :attr:`host` to set)""" 

632 return self.host.split(":", 1)[0] 

633 

634 @property 

635 def server_port(self) -> int: 

636 """The server port as integer (read-only, use :attr:`host` to set)""" 

637 pieces = self.host.split(":", 1) 

638 

639 if len(pieces) == 2: 

640 try: 

641 return int(pieces[1]) 

642 except ValueError: 

643 pass 

644 

645 if self.url_scheme == "https": 

646 return 443 

647 return 80 

648 

649 def __del__(self) -> None: 

650 self.close() 

651 

652 def __enter__(self) -> te.Self: 

653 return self 

654 

655 def __exit__( 

656 self, 

657 exc_type: type[BaseException] | None, 

658 exc_val: BaseException | None, 

659 exc_tb: TracebackType | None, 

660 ) -> None: 

661 self.close() 

662 

663 def close(self) -> None: 

664 """Close all open files in :attr:`files`. :attr:`input_stream` is not 

665 closed, as it is assumed to be managed externally. 

666 """ 

667 self._files.close() 

668 

669 def get_environ(self) -> WSGIEnvironment: 

670 """Return the built environ. 

671 

672 .. versionchanged:: 0.15 

673 The content type and length headers are set based on 

674 input stream detection. Previously this only set the WSGI 

675 keys. 

676 """ 

677 input_stream = self.input_stream 

678 content_length = self.content_length 

679 

680 mimetype = self.mimetype 

681 content_type = self.content_type 

682 

683 if input_stream is not None: 

684 start_pos = input_stream.tell() 

685 input_stream.seek(0, 2) 

686 end_pos = input_stream.tell() 

687 input_stream.seek(start_pos) 

688 content_length = end_pos - start_pos 

689 elif mimetype == "multipart/form-data": 

690 input_stream, content_length, boundary = stream_encode_multipart( 

691 CombinedMultiDict([self.form, self.files]) 

692 ) 

693 content_type = f'{mimetype}; boundary="{boundary}"' 

694 elif mimetype == "application/x-www-form-urlencoded": 

695 form_encoded = _urlencode(self.form).encode("ascii") 

696 content_length = len(form_encoded) 

697 input_stream = BytesIO(form_encoded) 

698 else: 

699 input_stream = BytesIO() 

700 

701 result: WSGIEnvironment = {} 

702 if self.environ_base: 

703 result.update(self.environ_base) 

704 

705 def _path_encode(x: str) -> str: 

706 return _wsgi_encoding_dance(unquote(x)) 

707 

708 raw_uri = _wsgi_encoding_dance(self.request_uri) 

709 result.update( 

710 { 

711 "REQUEST_METHOD": self.method, 

712 "SCRIPT_NAME": _path_encode(self.script_root), 

713 "PATH_INFO": _path_encode(self.path), 

714 "QUERY_STRING": _wsgi_encoding_dance(self.query_string), 

715 # Non-standard, added by mod_wsgi, uWSGI 

716 "REQUEST_URI": raw_uri, 

717 # Non-standard, added by gunicorn 

718 "RAW_URI": raw_uri, 

719 "SERVER_NAME": self.server_name, 

720 "SERVER_PORT": str(self.server_port), 

721 "HTTP_HOST": self.host, 

722 "SERVER_PROTOCOL": self.server_protocol, 

723 "wsgi.version": self.wsgi_version, 

724 "wsgi.url_scheme": self.url_scheme, 

725 "wsgi.input": input_stream, 

726 "wsgi.errors": self.errors_stream, 

727 "wsgi.multithread": self.multithread, 

728 "wsgi.multiprocess": self.multiprocess, 

729 "wsgi.run_once": self.run_once, 

730 } 

731 ) 

732 

733 headers = self.headers.copy() 

734 # Don't send these as headers, they're part of the environ. 

735 headers.remove("Content-Type") 

736 headers.remove("Content-Length") 

737 

738 if content_type is not None: 

739 result["CONTENT_TYPE"] = content_type 

740 

741 if content_length is not None: 

742 result["CONTENT_LENGTH"] = str(content_length) 

743 

744 combined_headers = defaultdict(list) 

745 

746 for key, value in headers.to_wsgi_list(): 

747 combined_headers[f"HTTP_{key.upper().replace('-', '_')}"].append(value) 

748 

749 for key, values in combined_headers.items(): 

750 result[key] = ", ".join(values) 

751 

752 if self.environ_overrides: 

753 result.update(self.environ_overrides) 

754 

755 return result 

756 

757 def get_request(self, cls: type[Request] | None = None) -> Request: 

758 """Returns a request with the data. If the request class is not 

759 specified :attr:`request_class` is used. 

760 

761 :param cls: The request wrapper to use. 

762 """ 

763 if cls is None: 

764 cls = self.request_class 

765 

766 return cls(self.get_environ()) 

767 

768 

769class ClientRedirectError(Exception): 

770 """If a redirect loop is detected when using follow_redirects=True with 

771 the :cls:`Client`, then this exception is raised. 

772 """ 

773 

774 

775class Client: 

776 """Simulate sending requests to a WSGI application without running a WSGI or HTTP 

777 server. 

778 

779 :param application: The WSGI application to make requests to. 

780 :param response_wrapper: A :class:`.Response` class to wrap response data with. 

781 Defaults to :class:`.TestResponse`. If it's not a subclass of ``TestResponse``, 

782 one will be created. 

783 :param use_cookies: Persist cookies from ``Set-Cookie`` response headers to the 

784 ``Cookie`` header in subsequent requests. Domain and path matching is supported, 

785 but other cookie parameters are ignored. 

786 :param allow_subdomain_redirects: Allow requests to follow redirects to subdomains. 

787 Enable this if the application handles subdomains and redirects between them. 

788 

789 .. versionchanged:: 2.3 

790 Simplify cookie implementation, support domain and path matching. 

791 

792 .. versionchanged:: 2.1 

793 All data is available as properties on the returned response object. The 

794 response cannot be returned as a tuple. 

795 

796 .. versionchanged:: 2.0 

797 ``response_wrapper`` is always a subclass of :class:``TestResponse``. 

798 

799 .. versionchanged:: 0.5 

800 Added the ``use_cookies`` parameter. 

801 """ 

802 

803 def __init__( 

804 self, 

805 application: WSGIApplication, 

806 response_wrapper: type[Response] | None = None, 

807 use_cookies: bool = True, 

808 allow_subdomain_redirects: bool = False, 

809 ) -> None: 

810 self.application = application 

811 

812 if response_wrapper in {None, Response}: 

813 response_wrapper = TestResponse 

814 elif response_wrapper is not None and not issubclass( 

815 response_wrapper, TestResponse 

816 ): 

817 response_wrapper = type( 

818 "WrapperTestResponse", 

819 (TestResponse, response_wrapper), 

820 {}, 

821 ) 

822 

823 self.response_wrapper = t.cast(type["TestResponse"], response_wrapper) 

824 

825 if use_cookies: 

826 self._cookies: dict[tuple[str, str, str], Cookie] | None = {} 

827 else: 

828 self._cookies = None 

829 

830 self.allow_subdomain_redirects = allow_subdomain_redirects 

831 

832 def get_cookie( 

833 self, key: str, domain: str = "localhost", path: str = "/" 

834 ) -> Cookie | None: 

835 """Return a :class:`.Cookie` if it exists. Cookies are uniquely identified by 

836 ``(domain, path, key)``. 

837 

838 :param key: The decoded form of the key for the cookie. 

839 :param domain: The domain the cookie was set for. 

840 :param path: The path the cookie was set for. 

841 

842 .. versionadded:: 2.3 

843 """ 

844 if self._cookies is None: 

845 raise TypeError( 

846 "Cookies are disabled. Create a client with 'use_cookies=True'." 

847 ) 

848 

849 return self._cookies.get((domain, path, key)) 

850 

851 def set_cookie( 

852 self, 

853 key: str, 

854 value: str = "", 

855 *, 

856 domain: str = "localhost", 

857 origin_only: bool = True, 

858 path: str = "/", 

859 **kwargs: t.Any, 

860 ) -> None: 

861 """Set a cookie to be sent in subsequent requests. 

862 

863 This is a convenience to skip making a test request to a route that would set 

864 the cookie. To test the cookie, make a test request to a route that uses the 

865 cookie value. 

866 

867 The client uses ``domain``, ``origin_only``, and ``path`` to determine which 

868 cookies to send with a request. It does not use other cookie parameters that 

869 browsers use, since they're not applicable in tests. 

870 

871 :param key: The key part of the cookie. 

872 :param value: The value part of the cookie. 

873 :param domain: Send this cookie with requests that match this domain. If 

874 ``origin_only`` is true, it must be an exact match, otherwise it may be a 

875 suffix match. 

876 :param origin_only: Whether the domain must be an exact match to the request. 

877 :param path: Send this cookie with requests that match this path either exactly 

878 or as a prefix. 

879 :param kwargs: Passed to :func:`.dump_cookie`. 

880 

881 .. versionchanged:: 3.0 

882 The parameter ``server_name`` is removed. The first parameter is 

883 ``key``. Use the ``domain`` and ``origin_only`` parameters instead. 

884 

885 .. versionchanged:: 2.3 

886 The ``origin_only`` parameter was added. 

887 

888 .. versionchanged:: 2.3 

889 The ``domain`` parameter defaults to ``localhost``. 

890 """ 

891 if self._cookies is None: 

892 raise TypeError( 

893 "Cookies are disabled. Create a client with 'use_cookies=True'." 

894 ) 

895 

896 cookie = Cookie._from_response_header( 

897 domain, "/", dump_cookie(key, value, domain=domain, path=path, **kwargs) 

898 ) 

899 cookie.origin_only = origin_only 

900 

901 if cookie._should_delete: 

902 self._cookies.pop(cookie._storage_key, None) 

903 else: 

904 self._cookies[cookie._storage_key] = cookie 

905 

906 def delete_cookie( 

907 self, 

908 key: str, 

909 *, 

910 domain: str = "localhost", 

911 path: str = "/", 

912 ) -> None: 

913 """Delete a cookie if it exists. Cookies are uniquely identified by 

914 ``(domain, path, key)``. 

915 

916 :param key: The decoded form of the key for the cookie. 

917 :param domain: The domain the cookie was set for. 

918 :param path: The path the cookie was set for. 

919 

920 .. versionchanged:: 3.0 

921 The ``server_name`` parameter is removed. The first parameter is 

922 ``key``. Use the ``domain`` parameter instead. 

923 

924 .. versionchanged:: 3.0 

925 The ``secure``, ``httponly`` and ``samesite`` parameters are removed. 

926 

927 .. versionchanged:: 2.3 

928 The ``domain`` parameter defaults to ``localhost``. 

929 """ 

930 if self._cookies is None: 

931 raise TypeError( 

932 "Cookies are disabled. Create a client with 'use_cookies=True'." 

933 ) 

934 

935 self._cookies.pop((domain, path, key), None) 

936 

937 def _add_cookies_to_wsgi(self, environ: WSGIEnvironment) -> None: 

938 """If cookies are enabled, set the ``Cookie`` header in the environ to the 

939 cookies that are applicable to the request host and path. 

940 

941 :meta private: 

942 

943 .. versionadded:: 2.3 

944 """ 

945 if self._cookies is None: 

946 return 

947 

948 url = urlsplit(get_current_url(environ)) 

949 server_name = url.hostname or "localhost" 

950 value = "; ".join( 

951 c._to_request_header() 

952 for c in self._cookies.values() 

953 if c._matches_request(server_name, url.path) 

954 ) 

955 

956 if value: 

957 environ["HTTP_COOKIE"] = value 

958 else: 

959 environ.pop("HTTP_COOKIE", None) 

960 

961 def _update_cookies_from_response( 

962 self, server_name: str, path: str, headers: list[str] 

963 ) -> None: 

964 """If cookies are enabled, update the stored cookies from any ``Set-Cookie`` 

965 headers in the response. 

966 

967 :meta private: 

968 

969 .. versionadded:: 2.3 

970 """ 

971 if self._cookies is None: 

972 return 

973 

974 for header in headers: 

975 cookie = Cookie._from_response_header(server_name, path, header) 

976 

977 if cookie._should_delete: 

978 self._cookies.pop(cookie._storage_key, None) 

979 else: 

980 self._cookies[cookie._storage_key] = cookie 

981 

982 def run_wsgi_app( 

983 self, environ: WSGIEnvironment, buffered: bool = False 

984 ) -> tuple[t.Iterable[bytes], str, Headers]: 

985 """Runs the wrapped WSGI app with the given environment. 

986 

987 :meta private: 

988 """ 

989 self._add_cookies_to_wsgi(environ) 

990 rv = run_wsgi_app(self.application, environ, buffered=buffered) 

991 url = urlsplit(get_current_url(environ)) 

992 self._update_cookies_from_response( 

993 url.hostname or "localhost", url.path, rv[2].getlist("Set-Cookie") 

994 ) 

995 return rv 

996 

997 def resolve_redirect( 

998 self, response: TestResponse, buffered: bool = False 

999 ) -> TestResponse: 

1000 """Perform a new request to the location given by the redirect 

1001 response to the previous request. 

1002 

1003 :meta private: 

1004 """ 

1005 scheme, netloc, path, qs, anchor = urlsplit(response.location) 

1006 builder = EnvironBuilder.from_environ( 

1007 response.request.environ, path=path, query_string=qs 

1008 ) 

1009 

1010 to_name_parts = netloc.split(":", 1)[0].split(".") 

1011 from_name_parts = builder.server_name.split(".") 

1012 

1013 if to_name_parts != [""]: 

1014 # The new location has a host, use it for the base URL. 

1015 builder.url_scheme = scheme 

1016 builder.host = netloc 

1017 else: 

1018 # A local redirect with autocorrect_location_header=False 

1019 # doesn't have a host, so use the request's host. 

1020 to_name_parts = from_name_parts 

1021 

1022 # Explain why a redirect to a different server name won't be followed. 

1023 if to_name_parts != from_name_parts: 

1024 if to_name_parts[-len(from_name_parts) :] == from_name_parts: 

1025 if not self.allow_subdomain_redirects: 

1026 raise RuntimeError("Following subdomain redirects is not enabled.") 

1027 else: 

1028 raise RuntimeError("Following external redirects is not supported.") 

1029 

1030 path_parts = path.split("/") 

1031 root_parts = builder.script_root.split("/") 

1032 

1033 if path_parts[: len(root_parts)] == root_parts: 

1034 # Strip the script root from the path. 

1035 builder.path = path[len(builder.script_root) :] 

1036 else: 

1037 # The new location is not under the script root, so use the 

1038 # whole path and clear the previous root. 

1039 builder.path = path 

1040 builder.script_root = "" 

1041 

1042 # Certain statuses switch to GET in some cases 

1043 # https://fetch.spec.whatwg.org/#http-redirect-fetch 

1044 if (response.status_code in {301, 302} and builder.method == "POST") or ( 

1045 response.status_code == 303 and builder.method not in {"GET", "HEAD"} 

1046 ): 

1047 builder.method = "GET" 

1048 

1049 if builder.input_stream is not None: 

1050 builder.input_stream.close() 

1051 

1052 builder.input_stream = None # also closes and clears form and files 

1053 builder.content_type = None 

1054 builder.content_length = None 

1055 builder.headers.pop("Content-Encoding", None) 

1056 builder.headers.pop("Content-Language", None) 

1057 builder.headers.pop("Content-Location", None) 

1058 builder.headers.pop("Transfer-Encoding", None) 

1059 

1060 return self.open(builder, buffered=buffered) 

1061 

1062 def open( 

1063 self, 

1064 *args: t.Any, 

1065 buffered: bool = False, 

1066 follow_redirects: bool = False, 

1067 **kwargs: t.Any, 

1068 ) -> TestResponse: 

1069 """Generate an environ dict from the given arguments, make a 

1070 request to the application using it, and return the response. 

1071 

1072 :param args: Passed to :class:`EnvironBuilder` to create the 

1073 environ for the request. If a single arg is passed, it can 

1074 be an existing :class:`EnvironBuilder` or an environ dict. 

1075 :param buffered: Convert the iterator returned by the app into 

1076 a list. If the iterator has a ``close()`` method, it is 

1077 called automatically. 

1078 :param follow_redirects: Make additional requests to follow HTTP 

1079 redirects until a non-redirect status is returned. 

1080 :attr:`TestResponse.history` lists the intermediate 

1081 responses. 

1082 

1083 .. versionchanged:: 2.1 

1084 Removed the ``as_tuple`` parameter. 

1085 

1086 .. versionchanged:: 2.0 

1087 The request input stream is closed when calling 

1088 ``response.close()``. Input streams for redirects are 

1089 automatically closed. 

1090 

1091 .. versionchanged:: 0.5 

1092 If a dict is provided as file in the dict for the ``data`` 

1093 parameter the content type has to be called ``content_type`` 

1094 instead of ``mimetype``. This change was made for 

1095 consistency with :class:`werkzeug.FileWrapper`. 

1096 

1097 .. versionchanged:: 0.5 

1098 Added the ``follow_redirects`` parameter. 

1099 """ 

1100 request: Request | None = None 

1101 

1102 if not kwargs and len(args) == 1: 

1103 arg = args[0] 

1104 

1105 if isinstance(arg, EnvironBuilder): 

1106 request = arg.get_request() 

1107 elif isinstance(arg, dict): 

1108 with EnvironBuilder.from_environ(arg) as builder: 

1109 request = builder.get_request() 

1110 elif isinstance(arg, Request): 

1111 request = arg 

1112 

1113 if request is None: 

1114 with EnvironBuilder(*args, **kwargs) as builder: 

1115 request = builder.get_request() 

1116 

1117 response_parts = self.run_wsgi_app(request.environ, buffered=buffered) 

1118 response = self.response_wrapper(*response_parts, request=request) 

1119 

1120 redirects = set() 

1121 history: list[TestResponse] = [] 

1122 

1123 if not follow_redirects: 

1124 return response 

1125 

1126 while response.status_code in {301, 302, 303, 307, 308}: 

1127 # Exhaust intermediate response bodies to ensure middleware 

1128 # that returns an iterator runs any cleanup code. 

1129 if not buffered: 

1130 response.make_sequence() 

1131 response.close() 

1132 

1133 new_redirect_entry = (response.location, response.status_code) 

1134 

1135 if new_redirect_entry in redirects: 

1136 raise ClientRedirectError( 

1137 f"Loop detected: A {response.status_code} redirect" 

1138 f" to {response.location} was already made." 

1139 ) 

1140 

1141 redirects.add(new_redirect_entry) 

1142 response.history = tuple(history) 

1143 history.append(response) 

1144 response = self.resolve_redirect(response, buffered=buffered) 

1145 else: 

1146 # This is the final request after redirects. 

1147 response.history = tuple(history) 

1148 # Close the input stream when closing the response, in case 

1149 # the input is an open temporary file. 

1150 response.call_on_close(request.input_stream.close) 

1151 return response 

1152 

1153 def get(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1154 """Call :meth:`open` with ``method`` set to ``GET``.""" 

1155 kw["method"] = "GET" 

1156 return self.open(*args, **kw) 

1157 

1158 def post(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1159 """Call :meth:`open` with ``method`` set to ``POST``.""" 

1160 kw["method"] = "POST" 

1161 return self.open(*args, **kw) 

1162 

1163 def put(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1164 """Call :meth:`open` with ``method`` set to ``PUT``.""" 

1165 kw["method"] = "PUT" 

1166 return self.open(*args, **kw) 

1167 

1168 def delete(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1169 """Call :meth:`open` with ``method`` set to ``DELETE``.""" 

1170 kw["method"] = "DELETE" 

1171 return self.open(*args, **kw) 

1172 

1173 def patch(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1174 """Call :meth:`open` with ``method`` set to ``PATCH``.""" 

1175 kw["method"] = "PATCH" 

1176 return self.open(*args, **kw) 

1177 

1178 def options(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1179 """Call :meth:`open` with ``method`` set to ``OPTIONS``.""" 

1180 kw["method"] = "OPTIONS" 

1181 return self.open(*args, **kw) 

1182 

1183 def head(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1184 """Call :meth:`open` with ``method`` set to ``HEAD``.""" 

1185 kw["method"] = "HEAD" 

1186 return self.open(*args, **kw) 

1187 

1188 def trace(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1189 """Call :meth:`open` with ``method`` set to ``TRACE``.""" 

1190 kw["method"] = "TRACE" 

1191 return self.open(*args, **kw) 

1192 

1193 def __repr__(self) -> str: 

1194 return f"<{type(self).__name__} {self.application!r}>" 

1195 

1196 

1197def create_environ(*args: t.Any, **kwargs: t.Any) -> WSGIEnvironment: 

1198 """Create a new WSGI environ dict based on the values passed. The first 

1199 parameter should be the path of the request which defaults to '/'. The 

1200 second one can either be an absolute path (in that case the host is 

1201 localhost:80) or a full path to the request with scheme, netloc port and 

1202 the path to the script. 

1203 

1204 This accepts the same arguments as the :class:`EnvironBuilder` 

1205 constructor. 

1206 

1207 .. versionchanged:: 0.5 

1208 This function is now a thin wrapper over :class:`EnvironBuilder` which 

1209 was added in 0.5. The `headers`, `environ_base`, `environ_overrides` 

1210 and `charset` parameters were added. 

1211 """ 

1212 with EnvironBuilder(*args, **kwargs) as builder: 

1213 return builder.get_environ() 

1214 

1215 

1216def run_wsgi_app( 

1217 app: WSGIApplication, environ: WSGIEnvironment, buffered: bool = False 

1218) -> tuple[t.Iterable[bytes], str, Headers]: 

1219 """Return a tuple in the form (app_iter, status, headers) of the 

1220 application output. This works best if you pass it an application that 

1221 returns an iterator all the time. 

1222 

1223 Sometimes applications may use the `write()` callable returned 

1224 by the `start_response` function. This tries to resolve such edge 

1225 cases automatically. But if you don't get the expected output you 

1226 should set `buffered` to `True` which enforces buffering. 

1227 

1228 If passed an invalid WSGI application the behavior of this function is 

1229 undefined. Never pass non-conforming WSGI applications to this function. 

1230 

1231 :param app: the application to execute. 

1232 :param buffered: set to `True` to enforce buffering. 

1233 :return: tuple in the form ``(app_iter, status, headers)`` 

1234 """ 

1235 # Copy environ to ensure any mutations by the app (ProxyFix, for 

1236 # example) don't affect subsequent requests (such as redirects). 

1237 environ = _get_environ(environ).copy() 

1238 status: str 

1239 response: tuple[str, list[tuple[str, str]]] | None = None 

1240 buffer: list[bytes] = [] 

1241 

1242 def start_response(status, headers, exc_info=None): # type: ignore 

1243 nonlocal response 

1244 

1245 if exc_info: 

1246 try: 

1247 raise exc_info[1].with_traceback(exc_info[2]) 

1248 finally: 

1249 exc_info = None 

1250 

1251 response = (status, headers) 

1252 return buffer.append 

1253 

1254 app_rv = app(environ, start_response) 

1255 close_func = getattr(app_rv, "close", None) 

1256 app_iter: t.Iterable[bytes] = iter(app_rv) 

1257 

1258 # when buffering we emit the close call early and convert the 

1259 # application iterator into a regular list 

1260 if buffered: 

1261 try: 

1262 app_iter = list(app_iter) 

1263 finally: 

1264 if close_func is not None: 

1265 close_func() 

1266 

1267 # otherwise we iterate the application iter until we have a response, chain 

1268 # the already received data with the already collected data and wrap it in 

1269 # a new `ClosingIterator` if we need to restore a `close` callable from the 

1270 # original return value. 

1271 else: 

1272 for item in app_iter: 

1273 buffer.append(item) 

1274 

1275 if response is not None: 

1276 break 

1277 

1278 if buffer: 

1279 app_iter = chain(buffer, app_iter) 

1280 

1281 if close_func is not None and app_iter is not app_rv: 

1282 app_iter = ClosingIterator(app_iter, close_func) 

1283 

1284 status, headers = response # type: ignore 

1285 return app_iter, status, Headers(headers) 

1286 

1287 

1288class TestResponse(Response): 

1289 """:class:`~werkzeug.wrappers.Response` subclass that provides extra 

1290 information about requests made with the test :class:`Client`. 

1291 

1292 Test client requests will always return an instance of this class. 

1293 If a custom response class is passed to the client, it is 

1294 subclassed along with this to support test information. 

1295 

1296 If the test request included large files, or if the application is 

1297 serving a file, call :meth:`close` to close any open files and 

1298 prevent Python showing a ``ResourceWarning``. 

1299 

1300 .. versionchanged:: 2.2 

1301 Set the ``default_mimetype`` to None to prevent a mimetype being 

1302 assumed if missing. 

1303 

1304 .. versionchanged:: 2.1 

1305 Response instances cannot be treated as tuples. 

1306 

1307 .. versionadded:: 2.0 

1308 Test client methods always return instances of this class. 

1309 """ 

1310 

1311 default_mimetype = None 

1312 # Don't assume a mimetype, instead use whatever the response provides 

1313 

1314 request: Request 

1315 """A request object with the environ used to make the request that 

1316 resulted in this response. 

1317 """ 

1318 

1319 history: tuple[TestResponse, ...] 

1320 """A list of intermediate responses. Populated when the test request 

1321 is made with ``follow_redirects`` enabled. 

1322 """ 

1323 

1324 # Tell Pytest to ignore this, it's not a test class. 

1325 __test__ = False 

1326 

1327 def __init__( 

1328 self, 

1329 response: t.Iterable[bytes], 

1330 status: str, 

1331 headers: Headers, 

1332 request: Request, 

1333 history: tuple[TestResponse] = (), # type: ignore 

1334 **kwargs: t.Any, 

1335 ) -> None: 

1336 super().__init__(response, status, headers, **kwargs) 

1337 self.request = request 

1338 self.history = history 

1339 self._compat_tuple = response, status, headers 

1340 

1341 @cached_property 

1342 def text(self) -> str: 

1343 """The response data as text. A shortcut for 

1344 ``response.get_data(as_text=True)``. 

1345 

1346 .. versionadded:: 2.1 

1347 """ 

1348 return self.get_data(as_text=True) 

1349 

1350 

1351@dataclasses.dataclass 

1352class Cookie: 

1353 """A cookie key, value, and parameters. 

1354 

1355 The class itself is not a public API. Its attributes are documented for inspection 

1356 with :meth:`.Client.get_cookie` only. 

1357 

1358 .. versionadded:: 2.3 

1359 """ 

1360 

1361 key: str 

1362 """The cookie key, encoded as a client would see it.""" 

1363 

1364 value: str 

1365 """The cookie key, encoded as a client would see it.""" 

1366 

1367 decoded_key: str 

1368 """The cookie key, decoded as the application would set and see it.""" 

1369 

1370 decoded_value: str 

1371 """The cookie value, decoded as the application would set and see it.""" 

1372 

1373 expires: datetime | None 

1374 """The time at which the cookie is no longer valid.""" 

1375 

1376 max_age: int | None 

1377 """The number of seconds from when the cookie was set at which it is 

1378 no longer valid. 

1379 """ 

1380 

1381 domain: str 

1382 """The domain that the cookie was set for, or the request domain if not set.""" 

1383 

1384 origin_only: bool 

1385 """Whether the cookie will be sent for exact domain matches only. This is ``True`` 

1386 if the ``Domain`` parameter was not present. 

1387 """ 

1388 

1389 path: str 

1390 """The path that the cookie was set for.""" 

1391 

1392 secure: bool | None 

1393 """The ``Secure`` parameter.""" 

1394 

1395 http_only: bool | None 

1396 """The ``HttpOnly`` parameter.""" 

1397 

1398 same_site: str | None 

1399 """The ``SameSite`` parameter.""" 

1400 

1401 def _matches_request(self, server_name: str, path: str) -> bool: 

1402 return ( 

1403 server_name == self.domain 

1404 or ( 

1405 not self.origin_only 

1406 and server_name.endswith(self.domain) 

1407 and server_name[: -len(self.domain)].endswith(".") 

1408 ) 

1409 ) and ( 

1410 path == self.path 

1411 or ( 

1412 path.startswith(self.path) 

1413 and path[len(self.path) - self.path.endswith("/") :].startswith("/") 

1414 ) 

1415 ) 

1416 

1417 def _to_request_header(self) -> str: 

1418 return f"{self.key}={self.value}" 

1419 

1420 @classmethod 

1421 def _from_response_header(cls, server_name: str, path: str, header: str) -> te.Self: 

1422 header, _, parameters_str = header.partition(";") 

1423 key, _, value = header.partition("=") 

1424 decoded_key, decoded_value = next(parse_cookie(header).items()) # type: ignore[call-overload] 

1425 params = {} 

1426 

1427 for item in parameters_str.split(";"): 

1428 k, sep, v = item.partition("=") 

1429 params[k.strip().lower()] = v.strip() if sep else None 

1430 

1431 return cls( 

1432 key=key.strip(), 

1433 value=value.strip(), 

1434 decoded_key=decoded_key, 

1435 decoded_value=decoded_value, 

1436 expires=parse_date(params.get("expires")), 

1437 max_age=int(params["max-age"] or 0) if "max-age" in params else None, 

1438 domain=params.get("domain") or server_name, 

1439 origin_only="domain" not in params, 

1440 path=params.get("path") or path.rpartition("/")[0] or "/", 

1441 secure="secure" in params, 

1442 http_only="httponly" in params, 

1443 same_site=params.get("samesite"), 

1444 ) 

1445 

1446 @property 

1447 def _storage_key(self) -> tuple[str, str, str]: 

1448 return self.domain, self.path, self.decoded_key 

1449 

1450 @property 

1451 def _should_delete(self) -> bool: 

1452 return self.max_age == 0 or ( 

1453 self.expires is not None and self.expires.timestamp() == 0 

1454 )