Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/test.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

617 statements  

1from __future__ import annotations 

2 

3import dataclasses 

4import mimetypes 

5import sys 

6import typing as t 

7from collections import defaultdict 

8from datetime import datetime 

9from io import BytesIO 

10from itertools import chain 

11from random import random 

12from tempfile import TemporaryFile 

13from time import time 

14from urllib.parse import unquote 

15from urllib.parse import urlsplit 

16from urllib.parse import urlunsplit 

17 

18from ._internal import _get_environ 

19from ._internal import _wsgi_decoding_dance 

20from ._internal import _wsgi_encoding_dance 

21from .datastructures import Authorization 

22from .datastructures import CallbackDict 

23from .datastructures import CombinedMultiDict 

24from .datastructures import EnvironHeaders 

25from .datastructures import FileMultiDict 

26from .datastructures import Headers 

27from .datastructures import MultiDict 

28from .http import dump_cookie 

29from .http import dump_options_header 

30from .http import parse_cookie 

31from .http import parse_date 

32from .http import parse_options_header 

33from .sansio.multipart import Data 

34from .sansio.multipart import Epilogue 

35from .sansio.multipart import Field 

36from .sansio.multipart import File 

37from .sansio.multipart import MultipartEncoder 

38from .sansio.multipart import Preamble 

39from .urls import _urlencode 

40from .urls import iri_to_uri 

41from .utils import cached_property 

42from .utils import get_content_type 

43from .wrappers.request import Request 

44from .wrappers.response import Response 

45from .wsgi import ClosingIterator 

46from .wsgi import get_current_url 

47 

48if t.TYPE_CHECKING: 

49 import typing_extensions as te 

50 from _typeshed.wsgi import WSGIApplication 

51 from _typeshed.wsgi import WSGIEnvironment 

52 

53 

54def stream_encode_multipart( 

55 data: t.Mapping[str, t.Any], 

56 use_tempfile: bool = True, 

57 threshold: int = 1024 * 500, 

58 boundary: str | None = None, 

59) -> tuple[t.IO[bytes], int, str]: 

60 """Encode a dict of values (either strings or file descriptors or 

61 :class:`FileStorage` objects.) into a multipart encoded string stored 

62 in a file descriptor. 

63 

64 .. versionchanged:: 3.0 

65 The ``charset`` parameter was removed. 

66 """ 

67 if boundary is None: 

68 boundary = f"---------------WerkzeugFormPart_{time()}{random()}" 

69 

70 stream: t.IO[bytes] = BytesIO() 

71 total_length = 0 

72 on_disk = False 

73 write_binary: t.Callable[[bytes], int] 

74 

75 if use_tempfile: 

76 

77 def write_binary(s: bytes) -> int: 

78 nonlocal stream, total_length, on_disk 

79 

80 if on_disk: 

81 return stream.write(s) 

82 else: 

83 length = len(s) 

84 

85 if length + total_length <= threshold: 

86 stream.write(s) 

87 else: 

88 new_stream = t.cast(t.IO[bytes], TemporaryFile("wb+")) 

89 new_stream.write(stream.getvalue()) # type: ignore 

90 new_stream.write(s) 

91 stream = new_stream 

92 on_disk = True 

93 

94 total_length += length 

95 return length 

96 

97 else: 

98 write_binary = stream.write 

99 

100 encoder = MultipartEncoder(boundary.encode()) 

101 write_binary(encoder.send_event(Preamble(data=b""))) 

102 for key, value in _iter_data(data): 

103 reader = getattr(value, "read", None) 

104 if reader is not None: 

105 filename = getattr(value, "filename", getattr(value, "name", None)) 

106 content_type = getattr(value, "content_type", None) 

107 if content_type is None: 

108 content_type = ( 

109 filename 

110 and mimetypes.guess_type(filename)[0] 

111 or "application/octet-stream" 

112 ) 

113 headers = value.headers 

114 headers.update([("Content-Type", content_type)]) 

115 if filename is None: 

116 write_binary(encoder.send_event(Field(name=key, headers=headers))) 

117 else: 

118 write_binary( 

119 encoder.send_event( 

120 File(name=key, filename=filename, headers=headers) 

121 ) 

122 ) 

123 while True: 

124 chunk = reader(16384) 

125 

126 if not chunk: 

127 write_binary(encoder.send_event(Data(data=chunk, more_data=False))) 

128 break 

129 

130 write_binary(encoder.send_event(Data(data=chunk, more_data=True))) 

131 else: 

132 if not isinstance(value, str): 

133 value = str(value) 

134 write_binary(encoder.send_event(Field(name=key, headers=Headers()))) 

135 write_binary(encoder.send_event(Data(data=value.encode(), more_data=False))) 

136 

137 write_binary(encoder.send_event(Epilogue(data=b""))) 

138 

139 length = stream.tell() 

140 stream.seek(0) 

141 return stream, length, boundary 

142 

143 

144def encode_multipart( 

145 values: t.Mapping[str, t.Any], boundary: str | None = None 

146) -> tuple[str, bytes]: 

147 """Like `stream_encode_multipart` but returns a tuple in the form 

148 (``boundary``, ``data``) where data is bytes. 

149 

150 .. versionchanged:: 3.0 

151 The ``charset`` parameter was removed. 

152 """ 

153 stream, length, boundary = stream_encode_multipart( 

154 values, use_tempfile=False, boundary=boundary 

155 ) 

156 return boundary, stream.read() 

157 

158 

159def _iter_data(data: t.Mapping[str, t.Any]) -> t.Iterator[tuple[str, t.Any]]: 

160 """Iterate over a mapping that might have a list of values, yielding 

161 all key, value pairs. Almost like iter_multi_items but only allows 

162 lists, not tuples, of values so tuples can be used for files. 

163 """ 

164 if isinstance(data, MultiDict): 

165 yield from data.items(multi=True) 

166 else: 

167 for key, value in data.items(): 

168 if isinstance(value, list): 

169 for v in value: 

170 yield key, v 

171 else: 

172 yield key, value 

173 

174 

175_TAnyMultiDict = t.TypeVar("_TAnyMultiDict", bound="MultiDict[t.Any, t.Any]") 

176 

177 

178class EnvironBuilder: 

179 """This class can be used to conveniently create a WSGI environment 

180 for testing purposes. It can be used to quickly create WSGI environments 

181 or request objects from arbitrary data. 

182 

183 The signature of this class is also used in some other places as of 

184 Werkzeug 0.5 (:func:`create_environ`, :meth:`Response.from_values`, 

185 :meth:`Client.open`). Because of this most of the functionality is 

186 available through the constructor alone. 

187 

188 Files and regular form data can be manipulated independently of each 

189 other with the :attr:`form` and :attr:`files` attributes, but are 

190 passed with the same argument to the constructor: `data`. 

191 

192 `data` can be any of these values: 

193 

194 - a `str` or `bytes` object: The object is converted into an 

195 :attr:`input_stream`, the :attr:`content_length` is set and you have to 

196 provide a :attr:`content_type`. 

197 - a `dict` or :class:`MultiDict`: The keys have to be strings. The values 

198 have to be either any of the following objects, or a list of any of the 

199 following objects: 

200 

201 - a :class:`file`-like object: These are converted into 

202 :class:`FileStorage` objects automatically. 

203 - a `tuple`: The :meth:`~FileMultiDict.add_file` method is called 

204 with the key and the unpacked `tuple` items as positional 

205 arguments. 

206 - a `str`: The string is set as form data for the associated key. 

207 - a file-like object: The object content is loaded in memory and then 

208 handled like a regular `str` or a `bytes`. 

209 

210 :param path: the path of the request. In the WSGI environment this will 

211 end up as `PATH_INFO`. If the `query_string` is not defined 

212 and there is a question mark in the `path` everything after 

213 it is used as query string. 

214 :param base_url: the base URL is a URL that is used to extract the WSGI 

215 URL scheme, host (server name + server port) and the 

216 script root (`SCRIPT_NAME`). 

217 :param query_string: an optional string or dict with URL parameters. 

218 :param method: the HTTP method to use, defaults to `GET`. 

219 :param input_stream: an optional input stream. Do not specify this and 

220 `data`. As soon as an input stream is set you can't 

221 modify :attr:`args` and :attr:`files` unless you 

222 set the :attr:`input_stream` to `None` again. 

223 :param content_type: The content type for the request. As of 0.5 you 

224 don't have to provide this when specifying files 

225 and form data via `data`. 

226 :param content_length: The content length for the request. You don't 

227 have to specify this when providing data via 

228 `data`. 

229 :param errors_stream: an optional error stream that is used for 

230 `wsgi.errors`. Defaults to :data:`stderr`. 

231 :param multithread: controls `wsgi.multithread`. Defaults to `False`. 

232 :param multiprocess: controls `wsgi.multiprocess`. Defaults to `False`. 

233 :param run_once: controls `wsgi.run_once`. Defaults to `False`. 

234 :param headers: an optional list or :class:`Headers` object of headers. 

235 :param data: a string or dict of form data or a file-object. 

236 See explanation above. 

237 :param json: An object to be serialized and assigned to ``data``. 

238 Defaults the content type to ``"application/json"``. 

239 Serialized with the function assigned to :attr:`json_dumps`. 

240 :param environ_base: an optional dict of environment defaults. 

241 :param environ_overrides: an optional dict of environment overrides. 

242 :param auth: An authorization object to use for the 

243 ``Authorization`` header value. A ``(username, password)`` tuple 

244 is a shortcut for ``Basic`` authorization. 

245 

246 .. versionchanged:: 3.0 

247 The ``charset`` parameter was removed. 

248 

249 .. versionchanged:: 2.1 

250 ``CONTENT_TYPE`` and ``CONTENT_LENGTH`` are not duplicated as 

251 header keys in the environ. 

252 

253 .. versionchanged:: 2.0 

254 ``REQUEST_URI`` and ``RAW_URI`` is the full raw URI including 

255 the query string, not only the path. 

256 

257 .. versionchanged:: 2.0 

258 The default :attr:`request_class` is ``Request`` instead of 

259 ``BaseRequest``. 

260 

261 .. versionadded:: 2.0 

262 Added the ``auth`` parameter. 

263 

264 .. versionadded:: 0.15 

265 The ``json`` param and :meth:`json_dumps` method. 

266 

267 .. versionadded:: 0.15 

268 The environ has keys ``REQUEST_URI`` and ``RAW_URI`` containing 

269 the path before percent-decoding. This is not part of the WSGI 

270 PEP, but many WSGI servers include it. 

271 

272 .. versionchanged:: 0.6 

273 ``path`` and ``base_url`` can now be unicode strings that are 

274 encoded with :func:`iri_to_uri`. 

275 """ 

276 

277 #: the server protocol to use. defaults to HTTP/1.1 

278 server_protocol = "HTTP/1.1" 

279 

280 #: the wsgi version to use. defaults to (1, 0) 

281 wsgi_version = (1, 0) 

282 

283 #: The default request class used by :meth:`get_request`. 

284 request_class = Request 

285 

286 import json 

287 

288 #: The serialization function used when ``json`` is passed. 

289 json_dumps = staticmethod(json.dumps) 

290 del json 

291 

292 _args: MultiDict[str, str] | None 

293 _query_string: str | None 

294 _input_stream: t.IO[bytes] | None 

295 _form: MultiDict[str, str] | None 

296 _files: FileMultiDict | None 

297 

298 def __init__( 

299 self, 

300 path: str = "/", 

301 base_url: str | None = None, 

302 query_string: t.Mapping[str, str] | str | None = None, 

303 method: str = "GET", 

304 input_stream: t.IO[bytes] | None = None, 

305 content_type: str | None = None, 

306 content_length: int | None = None, 

307 errors_stream: t.IO[str] | None = None, 

308 multithread: bool = False, 

309 multiprocess: bool = False, 

310 run_once: bool = False, 

311 headers: Headers | t.Iterable[tuple[str, str]] | None = None, 

312 data: None | (t.IO[bytes] | str | bytes | t.Mapping[str, t.Any]) = None, 

313 environ_base: t.Mapping[str, t.Any] | None = None, 

314 environ_overrides: t.Mapping[str, t.Any] | None = None, 

315 mimetype: str | None = None, 

316 json: t.Mapping[str, t.Any] | None = None, 

317 auth: Authorization | tuple[str, str] | None = None, 

318 ) -> None: 

319 if query_string is not None and "?" in path: 

320 raise ValueError("Query string is defined in the path and as an argument") 

321 request_uri = urlsplit(path) 

322 if query_string is None and "?" in path: 

323 query_string = request_uri.query 

324 

325 self.path = iri_to_uri(request_uri.path) 

326 self.request_uri = path 

327 if base_url is not None: 

328 base_url = iri_to_uri(base_url) 

329 self.base_url = base_url # type: ignore 

330 if isinstance(query_string, str): 

331 self.query_string = query_string 

332 else: 

333 if query_string is None: 

334 query_string = MultiDict() 

335 elif not isinstance(query_string, MultiDict): 

336 query_string = MultiDict(query_string) 

337 self.args = query_string 

338 self.method = method 

339 if headers is None: 

340 headers = Headers() 

341 elif not isinstance(headers, Headers): 

342 headers = Headers(headers) 

343 self.headers = headers 

344 if content_type is not None: 

345 self.content_type = content_type 

346 if errors_stream is None: 

347 errors_stream = sys.stderr 

348 self.errors_stream = errors_stream 

349 self.multithread = multithread 

350 self.multiprocess = multiprocess 

351 self.run_once = run_once 

352 self.environ_base = environ_base 

353 self.environ_overrides = environ_overrides 

354 self.input_stream = input_stream 

355 self.content_length = content_length 

356 self.closed = False 

357 

358 if auth is not None: 

359 if isinstance(auth, tuple): 

360 auth = Authorization( 

361 "basic", {"username": auth[0], "password": auth[1]} 

362 ) 

363 

364 self.headers.set("Authorization", auth.to_header()) 

365 

366 if json is not None: 

367 if data is not None: 

368 raise TypeError("can't provide both json and data") 

369 

370 data = self.json_dumps(json) 

371 

372 if self.content_type is None: 

373 self.content_type = "application/json" 

374 

375 if data: 

376 if input_stream is not None: 

377 raise TypeError("can't provide input stream and data") 

378 if hasattr(data, "read"): 

379 data = data.read() 

380 if isinstance(data, str): 

381 data = data.encode() 

382 if isinstance(data, bytes): 

383 self.input_stream = BytesIO(data) 

384 if self.content_length is None: 

385 self.content_length = len(data) 

386 else: 

387 for key, value in _iter_data(data): 

388 if isinstance(value, (tuple, dict)) or hasattr(value, "read"): 

389 self._add_file_from_data(key, value) 

390 else: 

391 self.form.setlistdefault(key).append(value) 

392 

393 if mimetype is not None: 

394 self.mimetype = mimetype 

395 

396 @classmethod 

397 def from_environ(cls, environ: WSGIEnvironment, **kwargs: t.Any) -> EnvironBuilder: 

398 """Turn an environ dict back into a builder. Any extra kwargs 

399 override the args extracted from the environ. 

400 

401 .. versionchanged:: 2.0 

402 Path and query values are passed through the WSGI decoding 

403 dance to avoid double encoding. 

404 

405 .. versionadded:: 0.15 

406 """ 

407 headers = Headers(EnvironHeaders(environ)) 

408 out = { 

409 "path": _wsgi_decoding_dance(environ["PATH_INFO"]), 

410 "base_url": cls._make_base_url( 

411 environ["wsgi.url_scheme"], 

412 headers.pop("Host"), 

413 _wsgi_decoding_dance(environ["SCRIPT_NAME"]), 

414 ), 

415 "query_string": _wsgi_decoding_dance(environ["QUERY_STRING"]), 

416 "method": environ["REQUEST_METHOD"], 

417 "input_stream": environ["wsgi.input"], 

418 "content_type": headers.pop("Content-Type", None), 

419 "content_length": headers.pop("Content-Length", None), 

420 "errors_stream": environ["wsgi.errors"], 

421 "multithread": environ["wsgi.multithread"], 

422 "multiprocess": environ["wsgi.multiprocess"], 

423 "run_once": environ["wsgi.run_once"], 

424 "headers": headers, 

425 } 

426 out.update(kwargs) 

427 return cls(**out) 

428 

429 def _add_file_from_data( 

430 self, 

431 key: str, 

432 value: (t.IO[bytes] | tuple[t.IO[bytes], str] | tuple[t.IO[bytes], str, str]), 

433 ) -> None: 

434 """Called in the EnvironBuilder to add files from the data dict.""" 

435 if isinstance(value, tuple): 

436 self.files.add_file(key, *value) 

437 else: 

438 self.files.add_file(key, value) 

439 

440 @staticmethod 

441 def _make_base_url(scheme: str, host: str, script_root: str) -> str: 

442 return urlunsplit((scheme, host, script_root, "", "")).rstrip("/") + "/" 

443 

444 @property 

445 def base_url(self) -> str: 

446 """The base URL is used to extract the URL scheme, host name, 

447 port, and root path. 

448 """ 

449 return self._make_base_url(self.url_scheme, self.host, self.script_root) 

450 

451 @base_url.setter 

452 def base_url(self, value: str | None) -> None: 

453 if value is None: 

454 scheme = "http" 

455 netloc = "localhost" 

456 script_root = "" 

457 else: 

458 scheme, netloc, script_root, qs, anchor = urlsplit(value) 

459 if qs or anchor: 

460 raise ValueError("base url must not contain a query string or fragment") 

461 self.script_root = script_root.rstrip("/") 

462 self.host = netloc 

463 self.url_scheme = scheme 

464 

465 @property 

466 def content_type(self) -> str | None: 

467 """The content type for the request. Reflected from and to 

468 the :attr:`headers`. Do not set if you set :attr:`files` or 

469 :attr:`form` for auto detection. 

470 """ 

471 ct = self.headers.get("Content-Type") 

472 if ct is None and not self._input_stream: 

473 if self._files: 

474 return "multipart/form-data" 

475 if self._form: 

476 return "application/x-www-form-urlencoded" 

477 return None 

478 return ct 

479 

480 @content_type.setter 

481 def content_type(self, value: str | None) -> None: 

482 if value is None: 

483 self.headers.pop("Content-Type", None) 

484 else: 

485 self.headers["Content-Type"] = value 

486 

487 @property 

488 def mimetype(self) -> str | None: 

489 """The mimetype (content type without charset etc.) 

490 

491 .. versionadded:: 0.14 

492 """ 

493 ct = self.content_type 

494 return ct.split(";")[0].strip() if ct else None 

495 

496 @mimetype.setter 

497 def mimetype(self, value: str) -> None: 

498 self.content_type = get_content_type(value, "utf-8") 

499 

500 @property 

501 def mimetype_params(self) -> t.Mapping[str, str]: 

502 """The mimetype parameters as dict. For example if the 

503 content type is ``text/html; charset=utf-8`` the params would be 

504 ``{'charset': 'utf-8'}``. 

505 

506 .. versionadded:: 0.14 

507 """ 

508 

509 def on_update(d: CallbackDict[str, str]) -> None: 

510 self.headers["Content-Type"] = dump_options_header(self.mimetype, d) 

511 

512 d = parse_options_header(self.headers.get("content-type", ""))[1] 

513 return CallbackDict(d, on_update) 

514 

515 @property 

516 def content_length(self) -> int | None: 

517 """The content length as integer. Reflected from and to the 

518 :attr:`headers`. Do not set if you set :attr:`files` or 

519 :attr:`form` for auto detection. 

520 """ 

521 return self.headers.get("Content-Length", type=int) 

522 

523 @content_length.setter 

524 def content_length(self, value: int | None) -> None: 

525 if value is None: 

526 self.headers.pop("Content-Length", None) 

527 else: 

528 self.headers["Content-Length"] = str(value) 

529 

530 def _get_form(self, name: str, storage: type[_TAnyMultiDict]) -> _TAnyMultiDict: 

531 """Common behavior for getting the :attr:`form` and 

532 :attr:`files` properties. 

533 

534 :param name: Name of the internal cached attribute. 

535 :param storage: Storage class used for the data. 

536 """ 

537 if self.input_stream is not None: 

538 raise AttributeError("an input stream is defined") 

539 

540 rv = getattr(self, name) 

541 

542 if rv is None: 

543 rv = storage() 

544 setattr(self, name, rv) 

545 

546 return rv # type: ignore 

547 

548 def _set_form(self, name: str, value: MultiDict[str, t.Any]) -> None: 

549 """Common behavior for setting the :attr:`form` and 

550 :attr:`files` properties. 

551 

552 :param name: Name of the internal cached attribute. 

553 :param value: Value to assign to the attribute. 

554 """ 

555 self._input_stream = None 

556 setattr(self, name, value) 

557 

558 @property 

559 def form(self) -> MultiDict[str, str]: 

560 """A :class:`MultiDict` of form values.""" 

561 return self._get_form("_form", MultiDict) 

562 

563 @form.setter 

564 def form(self, value: MultiDict[str, str]) -> None: 

565 self._set_form("_form", value) 

566 

567 @property 

568 def files(self) -> FileMultiDict: 

569 """A :class:`FileMultiDict` of uploaded files. Use 

570 :meth:`~FileMultiDict.add_file` to add new files. 

571 """ 

572 return self._get_form("_files", FileMultiDict) 

573 

574 @files.setter 

575 def files(self, value: FileMultiDict) -> None: 

576 self._set_form("_files", value) 

577 

578 @property 

579 def input_stream(self) -> t.IO[bytes] | None: 

580 """An optional input stream. This is mutually exclusive with 

581 setting :attr:`form` and :attr:`files`, setting it will clear 

582 those. Do not provide this if the method is not ``POST`` or 

583 another method that has a body. 

584 """ 

585 return self._input_stream 

586 

587 @input_stream.setter 

588 def input_stream(self, value: t.IO[bytes] | None) -> None: 

589 self._input_stream = value 

590 self._form = None 

591 self._files = None 

592 

593 @property 

594 def query_string(self) -> str: 

595 """The query string. If you set this to a string 

596 :attr:`args` will no longer be available. 

597 """ 

598 if self._query_string is None: 

599 if self._args is not None: 

600 return _urlencode(self._args) 

601 return "" 

602 return self._query_string 

603 

604 @query_string.setter 

605 def query_string(self, value: str | None) -> None: 

606 self._query_string = value 

607 self._args = None 

608 

609 @property 

610 def args(self) -> MultiDict[str, str]: 

611 """The URL arguments as :class:`MultiDict`.""" 

612 if self._query_string is not None: 

613 raise AttributeError("a query string is defined") 

614 if self._args is None: 

615 self._args = MultiDict() 

616 return self._args 

617 

618 @args.setter 

619 def args(self, value: MultiDict[str, str] | None) -> None: 

620 self._query_string = None 

621 self._args = value 

622 

623 @property 

624 def server_name(self) -> str: 

625 """The server name (read-only, use :attr:`host` to set)""" 

626 return self.host.split(":", 1)[0] 

627 

628 @property 

629 def server_port(self) -> int: 

630 """The server port as integer (read-only, use :attr:`host` to set)""" 

631 pieces = self.host.split(":", 1) 

632 

633 if len(pieces) == 2: 

634 try: 

635 return int(pieces[1]) 

636 except ValueError: 

637 pass 

638 

639 if self.url_scheme == "https": 

640 return 443 

641 return 80 

642 

643 def __del__(self) -> None: 

644 try: 

645 self.close() 

646 except Exception: 

647 pass 

648 

649 def close(self) -> None: 

650 """Closes all files. If you put real :class:`file` objects into the 

651 :attr:`files` dict you can call this method to automatically close 

652 them all in one go. 

653 """ 

654 if self.closed: 

655 return 

656 try: 

657 files = self.files.values() 

658 except AttributeError: 

659 files = () 

660 for f in files: 

661 try: 

662 f.close() 

663 except Exception: 

664 pass 

665 self.closed = True 

666 

667 def get_environ(self) -> WSGIEnvironment: 

668 """Return the built environ. 

669 

670 .. versionchanged:: 0.15 

671 The content type and length headers are set based on 

672 input stream detection. Previously this only set the WSGI 

673 keys. 

674 """ 

675 input_stream = self.input_stream 

676 content_length = self.content_length 

677 

678 mimetype = self.mimetype 

679 content_type = self.content_type 

680 

681 if input_stream is not None: 

682 start_pos = input_stream.tell() 

683 input_stream.seek(0, 2) 

684 end_pos = input_stream.tell() 

685 input_stream.seek(start_pos) 

686 content_length = end_pos - start_pos 

687 elif mimetype == "multipart/form-data": 

688 input_stream, content_length, boundary = stream_encode_multipart( 

689 CombinedMultiDict([self.form, self.files]) 

690 ) 

691 content_type = f'{mimetype}; boundary="{boundary}"' 

692 elif mimetype == "application/x-www-form-urlencoded": 

693 form_encoded = _urlencode(self.form).encode("ascii") 

694 content_length = len(form_encoded) 

695 input_stream = BytesIO(form_encoded) 

696 else: 

697 input_stream = BytesIO() 

698 

699 result: WSGIEnvironment = {} 

700 if self.environ_base: 

701 result.update(self.environ_base) 

702 

703 def _path_encode(x: str) -> str: 

704 return _wsgi_encoding_dance(unquote(x)) 

705 

706 raw_uri = _wsgi_encoding_dance(self.request_uri) 

707 result.update( 

708 { 

709 "REQUEST_METHOD": self.method, 

710 "SCRIPT_NAME": _path_encode(self.script_root), 

711 "PATH_INFO": _path_encode(self.path), 

712 "QUERY_STRING": _wsgi_encoding_dance(self.query_string), 

713 # Non-standard, added by mod_wsgi, uWSGI 

714 "REQUEST_URI": raw_uri, 

715 # Non-standard, added by gunicorn 

716 "RAW_URI": raw_uri, 

717 "SERVER_NAME": self.server_name, 

718 "SERVER_PORT": str(self.server_port), 

719 "HTTP_HOST": self.host, 

720 "SERVER_PROTOCOL": self.server_protocol, 

721 "wsgi.version": self.wsgi_version, 

722 "wsgi.url_scheme": self.url_scheme, 

723 "wsgi.input": input_stream, 

724 "wsgi.errors": self.errors_stream, 

725 "wsgi.multithread": self.multithread, 

726 "wsgi.multiprocess": self.multiprocess, 

727 "wsgi.run_once": self.run_once, 

728 } 

729 ) 

730 

731 headers = self.headers.copy() 

732 # Don't send these as headers, they're part of the environ. 

733 headers.remove("Content-Type") 

734 headers.remove("Content-Length") 

735 

736 if content_type is not None: 

737 result["CONTENT_TYPE"] = content_type 

738 

739 if content_length is not None: 

740 result["CONTENT_LENGTH"] = str(content_length) 

741 

742 combined_headers = defaultdict(list) 

743 

744 for key, value in headers.to_wsgi_list(): 

745 combined_headers[f"HTTP_{key.upper().replace('-', '_')}"].append(value) 

746 

747 for key, values in combined_headers.items(): 

748 result[key] = ", ".join(values) 

749 

750 if self.environ_overrides: 

751 result.update(self.environ_overrides) 

752 

753 return result 

754 

755 def get_request(self, cls: type[Request] | None = None) -> Request: 

756 """Returns a request with the data. If the request class is not 

757 specified :attr:`request_class` is used. 

758 

759 :param cls: The request wrapper to use. 

760 """ 

761 if cls is None: 

762 cls = self.request_class 

763 

764 return cls(self.get_environ()) 

765 

766 

767class ClientRedirectError(Exception): 

768 """If a redirect loop is detected when using follow_redirects=True with 

769 the :cls:`Client`, then this exception is raised. 

770 """ 

771 

772 

773class Client: 

774 """Simulate sending requests to a WSGI application without running a WSGI or HTTP 

775 server. 

776 

777 :param application: The WSGI application to make requests to. 

778 :param response_wrapper: A :class:`.Response` class to wrap response data with. 

779 Defaults to :class:`.TestResponse`. If it's not a subclass of ``TestResponse``, 

780 one will be created. 

781 :param use_cookies: Persist cookies from ``Set-Cookie`` response headers to the 

782 ``Cookie`` header in subsequent requests. Domain and path matching is supported, 

783 but other cookie parameters are ignored. 

784 :param allow_subdomain_redirects: Allow requests to follow redirects to subdomains. 

785 Enable this if the application handles subdomains and redirects between them. 

786 

787 .. versionchanged:: 2.3 

788 Simplify cookie implementation, support domain and path matching. 

789 

790 .. versionchanged:: 2.1 

791 All data is available as properties on the returned response object. The 

792 response cannot be returned as a tuple. 

793 

794 .. versionchanged:: 2.0 

795 ``response_wrapper`` is always a subclass of :class:``TestResponse``. 

796 

797 .. versionchanged:: 0.5 

798 Added the ``use_cookies`` parameter. 

799 """ 

800 

801 def __init__( 

802 self, 

803 application: WSGIApplication, 

804 response_wrapper: type[Response] | None = None, 

805 use_cookies: bool = True, 

806 allow_subdomain_redirects: bool = False, 

807 ) -> None: 

808 self.application = application 

809 

810 if response_wrapper in {None, Response}: 

811 response_wrapper = TestResponse 

812 elif response_wrapper is not None and not issubclass( 

813 response_wrapper, TestResponse 

814 ): 

815 response_wrapper = type( 

816 "WrapperTestResponse", 

817 (TestResponse, response_wrapper), 

818 {}, 

819 ) 

820 

821 self.response_wrapper = t.cast(type["TestResponse"], response_wrapper) 

822 

823 if use_cookies: 

824 self._cookies: dict[tuple[str, str, str], Cookie] | None = {} 

825 else: 

826 self._cookies = None 

827 

828 self.allow_subdomain_redirects = allow_subdomain_redirects 

829 

830 def get_cookie( 

831 self, key: str, domain: str = "localhost", path: str = "/" 

832 ) -> Cookie | None: 

833 """Return a :class:`.Cookie` if it exists. Cookies are uniquely identified by 

834 ``(domain, path, key)``. 

835 

836 :param key: The decoded form of the key for the cookie. 

837 :param domain: The domain the cookie was set for. 

838 :param path: The path the cookie was set for. 

839 

840 .. versionadded:: 2.3 

841 """ 

842 if self._cookies is None: 

843 raise TypeError( 

844 "Cookies are disabled. Create a client with 'use_cookies=True'." 

845 ) 

846 

847 return self._cookies.get((domain, path, key)) 

848 

849 def set_cookie( 

850 self, 

851 key: str, 

852 value: str = "", 

853 *, 

854 domain: str = "localhost", 

855 origin_only: bool = True, 

856 path: str = "/", 

857 **kwargs: t.Any, 

858 ) -> None: 

859 """Set a cookie to be sent in subsequent requests. 

860 

861 This is a convenience to skip making a test request to a route that would set 

862 the cookie. To test the cookie, make a test request to a route that uses the 

863 cookie value. 

864 

865 The client uses ``domain``, ``origin_only``, and ``path`` to determine which 

866 cookies to send with a request. It does not use other cookie parameters that 

867 browsers use, since they're not applicable in tests. 

868 

869 :param key: The key part of the cookie. 

870 :param value: The value part of the cookie. 

871 :param domain: Send this cookie with requests that match this domain. If 

872 ``origin_only`` is true, it must be an exact match, otherwise it may be a 

873 suffix match. 

874 :param origin_only: Whether the domain must be an exact match to the request. 

875 :param path: Send this cookie with requests that match this path either exactly 

876 or as a prefix. 

877 :param kwargs: Passed to :func:`.dump_cookie`. 

878 

879 .. versionchanged:: 3.0 

880 The parameter ``server_name`` is removed. The first parameter is 

881 ``key``. Use the ``domain`` and ``origin_only`` parameters instead. 

882 

883 .. versionchanged:: 2.3 

884 The ``origin_only`` parameter was added. 

885 

886 .. versionchanged:: 2.3 

887 The ``domain`` parameter defaults to ``localhost``. 

888 """ 

889 if self._cookies is None: 

890 raise TypeError( 

891 "Cookies are disabled. Create a client with 'use_cookies=True'." 

892 ) 

893 

894 cookie = Cookie._from_response_header( 

895 domain, "/", dump_cookie(key, value, domain=domain, path=path, **kwargs) 

896 ) 

897 cookie.origin_only = origin_only 

898 

899 if cookie._should_delete: 

900 self._cookies.pop(cookie._storage_key, None) 

901 else: 

902 self._cookies[cookie._storage_key] = cookie 

903 

904 def delete_cookie( 

905 self, 

906 key: str, 

907 *, 

908 domain: str = "localhost", 

909 path: str = "/", 

910 ) -> None: 

911 """Delete a cookie if it exists. Cookies are uniquely identified by 

912 ``(domain, path, key)``. 

913 

914 :param key: The decoded form of the key for the cookie. 

915 :param domain: The domain the cookie was set for. 

916 :param path: The path the cookie was set for. 

917 

918 .. versionchanged:: 3.0 

919 The ``server_name`` parameter is removed. The first parameter is 

920 ``key``. Use the ``domain`` parameter instead. 

921 

922 .. versionchanged:: 3.0 

923 The ``secure``, ``httponly`` and ``samesite`` parameters are removed. 

924 

925 .. versionchanged:: 2.3 

926 The ``domain`` parameter defaults to ``localhost``. 

927 """ 

928 if self._cookies is None: 

929 raise TypeError( 

930 "Cookies are disabled. Create a client with 'use_cookies=True'." 

931 ) 

932 

933 self._cookies.pop((domain, path, key), None) 

934 

935 def _add_cookies_to_wsgi(self, environ: WSGIEnvironment) -> None: 

936 """If cookies are enabled, set the ``Cookie`` header in the environ to the 

937 cookies that are applicable to the request host and path. 

938 

939 :meta private: 

940 

941 .. versionadded:: 2.3 

942 """ 

943 if self._cookies is None: 

944 return 

945 

946 url = urlsplit(get_current_url(environ)) 

947 server_name = url.hostname or "localhost" 

948 value = "; ".join( 

949 c._to_request_header() 

950 for c in self._cookies.values() 

951 if c._matches_request(server_name, url.path) 

952 ) 

953 

954 if value: 

955 environ["HTTP_COOKIE"] = value 

956 else: 

957 environ.pop("HTTP_COOKIE", None) 

958 

959 def _update_cookies_from_response( 

960 self, server_name: str, path: str, headers: list[str] 

961 ) -> None: 

962 """If cookies are enabled, update the stored cookies from any ``Set-Cookie`` 

963 headers in the response. 

964 

965 :meta private: 

966 

967 .. versionadded:: 2.3 

968 """ 

969 if self._cookies is None: 

970 return 

971 

972 for header in headers: 

973 cookie = Cookie._from_response_header(server_name, path, header) 

974 

975 if cookie._should_delete: 

976 self._cookies.pop(cookie._storage_key, None) 

977 else: 

978 self._cookies[cookie._storage_key] = cookie 

979 

980 def run_wsgi_app( 

981 self, environ: WSGIEnvironment, buffered: bool = False 

982 ) -> tuple[t.Iterable[bytes], str, Headers]: 

983 """Runs the wrapped WSGI app with the given environment. 

984 

985 :meta private: 

986 """ 

987 self._add_cookies_to_wsgi(environ) 

988 rv = run_wsgi_app(self.application, environ, buffered=buffered) 

989 url = urlsplit(get_current_url(environ)) 

990 self._update_cookies_from_response( 

991 url.hostname or "localhost", url.path, rv[2].getlist("Set-Cookie") 

992 ) 

993 return rv 

994 

995 def resolve_redirect( 

996 self, response: TestResponse, buffered: bool = False 

997 ) -> TestResponse: 

998 """Perform a new request to the location given by the redirect 

999 response to the previous request. 

1000 

1001 :meta private: 

1002 """ 

1003 scheme, netloc, path, qs, anchor = urlsplit(response.location) 

1004 builder = EnvironBuilder.from_environ( 

1005 response.request.environ, path=path, query_string=qs 

1006 ) 

1007 

1008 to_name_parts = netloc.split(":", 1)[0].split(".") 

1009 from_name_parts = builder.server_name.split(".") 

1010 

1011 if to_name_parts != [""]: 

1012 # The new location has a host, use it for the base URL. 

1013 builder.url_scheme = scheme 

1014 builder.host = netloc 

1015 else: 

1016 # A local redirect with autocorrect_location_header=False 

1017 # doesn't have a host, so use the request's host. 

1018 to_name_parts = from_name_parts 

1019 

1020 # Explain why a redirect to a different server name won't be followed. 

1021 if to_name_parts != from_name_parts: 

1022 if to_name_parts[-len(from_name_parts) :] == from_name_parts: 

1023 if not self.allow_subdomain_redirects: 

1024 raise RuntimeError("Following subdomain redirects is not enabled.") 

1025 else: 

1026 raise RuntimeError("Following external redirects is not supported.") 

1027 

1028 path_parts = path.split("/") 

1029 root_parts = builder.script_root.split("/") 

1030 

1031 if path_parts[: len(root_parts)] == root_parts: 

1032 # Strip the script root from the path. 

1033 builder.path = path[len(builder.script_root) :] 

1034 else: 

1035 # The new location is not under the script root, so use the 

1036 # whole path and clear the previous root. 

1037 builder.path = path 

1038 builder.script_root = "" 

1039 

1040 # Only 307 and 308 preserve all of the original request. 

1041 if response.status_code not in {307, 308}: 

1042 # HEAD is preserved, everything else becomes GET. 

1043 if builder.method != "HEAD": 

1044 builder.method = "GET" 

1045 

1046 # Clear the body and the headers that describe it. 

1047 

1048 if builder.input_stream is not None: 

1049 builder.input_stream.close() 

1050 builder.input_stream = None 

1051 

1052 builder.content_type = None 

1053 builder.content_length = None 

1054 builder.headers.pop("Transfer-Encoding", None) 

1055 

1056 return self.open(builder, buffered=buffered) 

1057 

1058 def open( 

1059 self, 

1060 *args: t.Any, 

1061 buffered: bool = False, 

1062 follow_redirects: bool = False, 

1063 **kwargs: t.Any, 

1064 ) -> TestResponse: 

1065 """Generate an environ dict from the given arguments, make a 

1066 request to the application using it, and return the response. 

1067 

1068 :param args: Passed to :class:`EnvironBuilder` to create the 

1069 environ for the request. If a single arg is passed, it can 

1070 be an existing :class:`EnvironBuilder` or an environ dict. 

1071 :param buffered: Convert the iterator returned by the app into 

1072 a list. If the iterator has a ``close()`` method, it is 

1073 called automatically. 

1074 :param follow_redirects: Make additional requests to follow HTTP 

1075 redirects until a non-redirect status is returned. 

1076 :attr:`TestResponse.history` lists the intermediate 

1077 responses. 

1078 

1079 .. versionchanged:: 2.1 

1080 Removed the ``as_tuple`` parameter. 

1081 

1082 .. versionchanged:: 2.0 

1083 The request input stream is closed when calling 

1084 ``response.close()``. Input streams for redirects are 

1085 automatically closed. 

1086 

1087 .. versionchanged:: 0.5 

1088 If a dict is provided as file in the dict for the ``data`` 

1089 parameter the content type has to be called ``content_type`` 

1090 instead of ``mimetype``. This change was made for 

1091 consistency with :class:`werkzeug.FileWrapper`. 

1092 

1093 .. versionchanged:: 0.5 

1094 Added the ``follow_redirects`` parameter. 

1095 """ 

1096 request: Request | None = None 

1097 

1098 if not kwargs and len(args) == 1: 

1099 arg = args[0] 

1100 

1101 if isinstance(arg, EnvironBuilder): 

1102 request = arg.get_request() 

1103 elif isinstance(arg, dict): 

1104 request = EnvironBuilder.from_environ(arg).get_request() 

1105 elif isinstance(arg, Request): 

1106 request = arg 

1107 

1108 if request is None: 

1109 builder = EnvironBuilder(*args, **kwargs) 

1110 

1111 try: 

1112 request = builder.get_request() 

1113 finally: 

1114 builder.close() 

1115 

1116 response_parts = self.run_wsgi_app(request.environ, buffered=buffered) 

1117 response = self.response_wrapper(*response_parts, request=request) 

1118 

1119 redirects = set() 

1120 history: list[TestResponse] = [] 

1121 

1122 if not follow_redirects: 

1123 return response 

1124 

1125 while response.status_code in { 

1126 301, 

1127 302, 

1128 303, 

1129 305, 

1130 307, 

1131 308, 

1132 }: 

1133 # Exhaust intermediate response bodies to ensure middleware 

1134 # that returns an iterator runs any cleanup code. 

1135 if not buffered: 

1136 response.make_sequence() 

1137 response.close() 

1138 

1139 new_redirect_entry = (response.location, response.status_code) 

1140 

1141 if new_redirect_entry in redirects: 

1142 raise ClientRedirectError( 

1143 f"Loop detected: A {response.status_code} redirect" 

1144 f" to {response.location} was already made." 

1145 ) 

1146 

1147 redirects.add(new_redirect_entry) 

1148 response.history = tuple(history) 

1149 history.append(response) 

1150 response = self.resolve_redirect(response, buffered=buffered) 

1151 else: 

1152 # This is the final request after redirects. 

1153 response.history = tuple(history) 

1154 # Close the input stream when closing the response, in case 

1155 # the input is an open temporary file. 

1156 response.call_on_close(request.input_stream.close) 

1157 return response 

1158 

1159 def get(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1160 """Call :meth:`open` with ``method`` set to ``GET``.""" 

1161 kw["method"] = "GET" 

1162 return self.open(*args, **kw) 

1163 

1164 def post(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1165 """Call :meth:`open` with ``method`` set to ``POST``.""" 

1166 kw["method"] = "POST" 

1167 return self.open(*args, **kw) 

1168 

1169 def put(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1170 """Call :meth:`open` with ``method`` set to ``PUT``.""" 

1171 kw["method"] = "PUT" 

1172 return self.open(*args, **kw) 

1173 

1174 def delete(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1175 """Call :meth:`open` with ``method`` set to ``DELETE``.""" 

1176 kw["method"] = "DELETE" 

1177 return self.open(*args, **kw) 

1178 

1179 def patch(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1180 """Call :meth:`open` with ``method`` set to ``PATCH``.""" 

1181 kw["method"] = "PATCH" 

1182 return self.open(*args, **kw) 

1183 

1184 def options(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1185 """Call :meth:`open` with ``method`` set to ``OPTIONS``.""" 

1186 kw["method"] = "OPTIONS" 

1187 return self.open(*args, **kw) 

1188 

1189 def head(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1190 """Call :meth:`open` with ``method`` set to ``HEAD``.""" 

1191 kw["method"] = "HEAD" 

1192 return self.open(*args, **kw) 

1193 

1194 def trace(self, *args: t.Any, **kw: t.Any) -> TestResponse: 

1195 """Call :meth:`open` with ``method`` set to ``TRACE``.""" 

1196 kw["method"] = "TRACE" 

1197 return self.open(*args, **kw) 

1198 

1199 def __repr__(self) -> str: 

1200 return f"<{type(self).__name__} {self.application!r}>" 

1201 

1202 

1203def create_environ(*args: t.Any, **kwargs: t.Any) -> WSGIEnvironment: 

1204 """Create a new WSGI environ dict based on the values passed. The first 

1205 parameter should be the path of the request which defaults to '/'. The 

1206 second one can either be an absolute path (in that case the host is 

1207 localhost:80) or a full path to the request with scheme, netloc port and 

1208 the path to the script. 

1209 

1210 This accepts the same arguments as the :class:`EnvironBuilder` 

1211 constructor. 

1212 

1213 .. versionchanged:: 0.5 

1214 This function is now a thin wrapper over :class:`EnvironBuilder` which 

1215 was added in 0.5. The `headers`, `environ_base`, `environ_overrides` 

1216 and `charset` parameters were added. 

1217 """ 

1218 builder = EnvironBuilder(*args, **kwargs) 

1219 

1220 try: 

1221 return builder.get_environ() 

1222 finally: 

1223 builder.close() 

1224 

1225 

1226def run_wsgi_app( 

1227 app: WSGIApplication, environ: WSGIEnvironment, buffered: bool = False 

1228) -> tuple[t.Iterable[bytes], str, Headers]: 

1229 """Return a tuple in the form (app_iter, status, headers) of the 

1230 application output. This works best if you pass it an application that 

1231 returns an iterator all the time. 

1232 

1233 Sometimes applications may use the `write()` callable returned 

1234 by the `start_response` function. This tries to resolve such edge 

1235 cases automatically. But if you don't get the expected output you 

1236 should set `buffered` to `True` which enforces buffering. 

1237 

1238 If passed an invalid WSGI application the behavior of this function is 

1239 undefined. Never pass non-conforming WSGI applications to this function. 

1240 

1241 :param app: the application to execute. 

1242 :param buffered: set to `True` to enforce buffering. 

1243 :return: tuple in the form ``(app_iter, status, headers)`` 

1244 """ 

1245 # Copy environ to ensure any mutations by the app (ProxyFix, for 

1246 # example) don't affect subsequent requests (such as redirects). 

1247 environ = _get_environ(environ).copy() 

1248 status: str 

1249 response: tuple[str, list[tuple[str, str]]] | None = None 

1250 buffer: list[bytes] = [] 

1251 

1252 def start_response(status, headers, exc_info=None): # type: ignore 

1253 nonlocal response 

1254 

1255 if exc_info: 

1256 try: 

1257 raise exc_info[1].with_traceback(exc_info[2]) 

1258 finally: 

1259 exc_info = None 

1260 

1261 response = (status, headers) 

1262 return buffer.append 

1263 

1264 app_rv = app(environ, start_response) 

1265 close_func = getattr(app_rv, "close", None) 

1266 app_iter: t.Iterable[bytes] = iter(app_rv) 

1267 

1268 # when buffering we emit the close call early and convert the 

1269 # application iterator into a regular list 

1270 if buffered: 

1271 try: 

1272 app_iter = list(app_iter) 

1273 finally: 

1274 if close_func is not None: 

1275 close_func() 

1276 

1277 # otherwise we iterate the application iter until we have a response, chain 

1278 # the already received data with the already collected data and wrap it in 

1279 # a new `ClosingIterator` if we need to restore a `close` callable from the 

1280 # original return value. 

1281 else: 

1282 for item in app_iter: 

1283 buffer.append(item) 

1284 

1285 if response is not None: 

1286 break 

1287 

1288 if buffer: 

1289 app_iter = chain(buffer, app_iter) 

1290 

1291 if close_func is not None and app_iter is not app_rv: 

1292 app_iter = ClosingIterator(app_iter, close_func) 

1293 

1294 status, headers = response # type: ignore 

1295 return app_iter, status, Headers(headers) 

1296 

1297 

1298class TestResponse(Response): 

1299 """:class:`~werkzeug.wrappers.Response` subclass that provides extra 

1300 information about requests made with the test :class:`Client`. 

1301 

1302 Test client requests will always return an instance of this class. 

1303 If a custom response class is passed to the client, it is 

1304 subclassed along with this to support test information. 

1305 

1306 If the test request included large files, or if the application is 

1307 serving a file, call :meth:`close` to close any open files and 

1308 prevent Python showing a ``ResourceWarning``. 

1309 

1310 .. versionchanged:: 2.2 

1311 Set the ``default_mimetype`` to None to prevent a mimetype being 

1312 assumed if missing. 

1313 

1314 .. versionchanged:: 2.1 

1315 Response instances cannot be treated as tuples. 

1316 

1317 .. versionadded:: 2.0 

1318 Test client methods always return instances of this class. 

1319 """ 

1320 

1321 default_mimetype = None 

1322 # Don't assume a mimetype, instead use whatever the response provides 

1323 

1324 request: Request 

1325 """A request object with the environ used to make the request that 

1326 resulted in this response. 

1327 """ 

1328 

1329 history: tuple[TestResponse, ...] 

1330 """A list of intermediate responses. Populated when the test request 

1331 is made with ``follow_redirects`` enabled. 

1332 """ 

1333 

1334 # Tell Pytest to ignore this, it's not a test class. 

1335 __test__ = False 

1336 

1337 def __init__( 

1338 self, 

1339 response: t.Iterable[bytes], 

1340 status: str, 

1341 headers: Headers, 

1342 request: Request, 

1343 history: tuple[TestResponse] = (), # type: ignore 

1344 **kwargs: t.Any, 

1345 ) -> None: 

1346 super().__init__(response, status, headers, **kwargs) 

1347 self.request = request 

1348 self.history = history 

1349 self._compat_tuple = response, status, headers 

1350 

1351 @cached_property 

1352 def text(self) -> str: 

1353 """The response data as text. A shortcut for 

1354 ``response.get_data(as_text=True)``. 

1355 

1356 .. versionadded:: 2.1 

1357 """ 

1358 return self.get_data(as_text=True) 

1359 

1360 

1361@dataclasses.dataclass 

1362class Cookie: 

1363 """A cookie key, value, and parameters. 

1364 

1365 The class itself is not a public API. Its attributes are documented for inspection 

1366 with :meth:`.Client.get_cookie` only. 

1367 

1368 .. versionadded:: 2.3 

1369 """ 

1370 

1371 key: str 

1372 """The cookie key, encoded as a client would see it.""" 

1373 

1374 value: str 

1375 """The cookie key, encoded as a client would see it.""" 

1376 

1377 decoded_key: str 

1378 """The cookie key, decoded as the application would set and see it.""" 

1379 

1380 decoded_value: str 

1381 """The cookie value, decoded as the application would set and see it.""" 

1382 

1383 expires: datetime | None 

1384 """The time at which the cookie is no longer valid.""" 

1385 

1386 max_age: int | None 

1387 """The number of seconds from when the cookie was set at which it is 

1388 no longer valid. 

1389 """ 

1390 

1391 domain: str 

1392 """The domain that the cookie was set for, or the request domain if not set.""" 

1393 

1394 origin_only: bool 

1395 """Whether the cookie will be sent for exact domain matches only. This is ``True`` 

1396 if the ``Domain`` parameter was not present. 

1397 """ 

1398 

1399 path: str 

1400 """The path that the cookie was set for.""" 

1401 

1402 secure: bool | None 

1403 """The ``Secure`` parameter.""" 

1404 

1405 http_only: bool | None 

1406 """The ``HttpOnly`` parameter.""" 

1407 

1408 same_site: str | None 

1409 """The ``SameSite`` parameter.""" 

1410 

1411 def _matches_request(self, server_name: str, path: str) -> bool: 

1412 return ( 

1413 server_name == self.domain 

1414 or ( 

1415 not self.origin_only 

1416 and server_name.endswith(self.domain) 

1417 and server_name[: -len(self.domain)].endswith(".") 

1418 ) 

1419 ) and ( 

1420 path == self.path 

1421 or ( 

1422 path.startswith(self.path) 

1423 and path[len(self.path) - self.path.endswith("/") :].startswith("/") 

1424 ) 

1425 ) 

1426 

1427 def _to_request_header(self) -> str: 

1428 return f"{self.key}={self.value}" 

1429 

1430 @classmethod 

1431 def _from_response_header(cls, server_name: str, path: str, header: str) -> te.Self: 

1432 header, _, parameters_str = header.partition(";") 

1433 key, _, value = header.partition("=") 

1434 decoded_key, decoded_value = next(parse_cookie(header).items()) # type: ignore[call-overload] 

1435 params = {} 

1436 

1437 for item in parameters_str.split(";"): 

1438 k, sep, v = item.partition("=") 

1439 params[k.strip().lower()] = v.strip() if sep else None 

1440 

1441 return cls( 

1442 key=key.strip(), 

1443 value=value.strip(), 

1444 decoded_key=decoded_key, 

1445 decoded_value=decoded_value, 

1446 expires=parse_date(params.get("expires")), 

1447 max_age=int(params["max-age"] or 0) if "max-age" in params else None, 

1448 domain=params.get("domain") or server_name, 

1449 origin_only="domain" not in params, 

1450 path=params.get("path") or path.rpartition("/")[0] or "/", 

1451 secure="secure" in params, 

1452 http_only="httponly" in params, 

1453 same_site=params.get("samesite"), 

1454 ) 

1455 

1456 @property 

1457 def _storage_key(self) -> tuple[str, str, str]: 

1458 return self.domain, self.path, self.decoded_key 

1459 

1460 @property 

1461 def _should_delete(self) -> bool: 

1462 return self.max_age == 0 or ( 

1463 self.expires is not None and self.expires.timestamp() == 0 

1464 )