Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/httpx/_models.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

647 statements  

1from __future__ import annotations 

2 

3import codecs 

4import datetime 

5import email.message 

6import json as jsonlib 

7import re 

8import typing 

9import urllib.request 

10from collections.abc import Mapping 

11from http.cookiejar import Cookie, CookieJar 

12 

13from ._content import ByteStream, UnattachedStream, encode_request, encode_response 

14from ._decoders import ( 

15 SUPPORTED_DECODERS, 

16 ByteChunker, 

17 ContentDecoder, 

18 IdentityDecoder, 

19 LineDecoder, 

20 MultiDecoder, 

21 TextChunker, 

22 TextDecoder, 

23) 

24from ._exceptions import ( 

25 CookieConflict, 

26 HTTPStatusError, 

27 RequestNotRead, 

28 ResponseNotRead, 

29 StreamClosed, 

30 StreamConsumed, 

31 request_context, 

32) 

33from ._multipart import get_multipart_boundary_from_content_type 

34from ._status_codes import codes 

35from ._types import ( 

36 AsyncByteStream, 

37 CookieTypes, 

38 HeaderTypes, 

39 QueryParamTypes, 

40 RequestContent, 

41 RequestData, 

42 RequestExtensions, 

43 RequestFiles, 

44 ResponseContent, 

45 ResponseExtensions, 

46 SyncByteStream, 

47) 

48from ._urls import URL 

49from ._utils import to_bytes_or_str, to_str 

50 

51__all__ = ["Cookies", "Headers", "Request", "Response"] 

52 

53SENSITIVE_HEADERS = {"authorization", "proxy-authorization"} 

54 

55 

56def _is_known_encoding(encoding: str) -> bool: 

57 """ 

58 Return `True` if `encoding` is a known codec. 

59 """ 

60 try: 

61 codecs.lookup(encoding) 

62 except LookupError: 

63 return False 

64 return True 

65 

66 

67def _normalize_header_key(key: str | bytes, encoding: str | None = None) -> bytes: 

68 """ 

69 Coerce str/bytes into a strictly byte-wise HTTP header key. 

70 """ 

71 return key if isinstance(key, bytes) else key.encode(encoding or "ascii") 

72 

73 

74def _normalize_header_value(value: str | bytes, encoding: str | None = None) -> bytes: 

75 """ 

76 Coerce str/bytes into a strictly byte-wise HTTP header value. 

77 """ 

78 if isinstance(value, bytes): 

79 return value 

80 if not isinstance(value, str): 

81 raise TypeError(f"Header value must be str or bytes, not {type(value)}") 

82 return value.encode(encoding or "ascii") 

83 

84 

85def _parse_content_type_charset(content_type: str) -> str | None: 

86 # We used to use `cgi.parse_header()` here, but `cgi` became a dead battery. 

87 # See: https://peps.python.org/pep-0594/#cgi 

88 msg = email.message.Message() 

89 msg["content-type"] = content_type 

90 return msg.get_content_charset(failobj=None) 

91 

92 

93def _parse_header_links(value: str) -> list[dict[str, str]]: 

94 """ 

95 Returns a list of parsed link headers, for more info see: 

96 https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link 

97 The generic syntax of those is: 

98 Link: < uri-reference >; param1=value1; param2="value2" 

99 So for instance: 

100 Link; '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;' 

101 would return 

102 [ 

103 {"url": "http:/.../front.jpeg", "type": "image/jpeg"}, 

104 {"url": "http://.../back.jpeg"}, 

105 ] 

106 :param value: HTTP Link entity-header field 

107 :return: list of parsed link headers 

108 """ 

109 links: list[dict[str, str]] = [] 

110 replace_chars = " '\"" 

111 value = value.strip(replace_chars) 

112 if not value: 

113 return links 

114 for val in re.split(", *<", value): 

115 try: 

116 url, params = val.split(";", 1) 

117 except ValueError: 

118 url, params = val, "" 

119 link = {"url": url.strip("<> '\"")} 

120 for param in params.split(";"): 

121 try: 

122 key, value = param.split("=") 

123 except ValueError: 

124 break 

125 link[key.strip(replace_chars)] = value.strip(replace_chars) 

126 links.append(link) 

127 return links 

128 

129 

130def _obfuscate_sensitive_headers( 

131 items: typing.Iterable[tuple[typing.AnyStr, typing.AnyStr]], 

132) -> typing.Iterator[tuple[typing.AnyStr, typing.AnyStr]]: 

133 for k, v in items: 

134 if to_str(k.lower()) in SENSITIVE_HEADERS: 

135 v = to_bytes_or_str("[secure]", match_type_of=v) 

136 yield k, v 

137 

138 

139class Headers(typing.MutableMapping[str, str]): 

140 """ 

141 HTTP headers, as a case-insensitive multi-dict. 

142 """ 

143 

144 def __init__( 

145 self, 

146 headers: HeaderTypes | None = None, 

147 encoding: str | None = None, 

148 ) -> None: 

149 self._list = [] # type: typing.List[typing.Tuple[bytes, bytes, bytes]] 

150 

151 if isinstance(headers, Headers): 

152 self._list = list(headers._list) 

153 elif isinstance(headers, Mapping): 

154 for k, v in headers.items(): 

155 bytes_key = _normalize_header_key(k, encoding) 

156 bytes_value = _normalize_header_value(v, encoding) 

157 self._list.append((bytes_key, bytes_key.lower(), bytes_value)) 

158 elif headers is not None: 

159 for k, v in headers: 

160 bytes_key = _normalize_header_key(k, encoding) 

161 bytes_value = _normalize_header_value(v, encoding) 

162 self._list.append((bytes_key, bytes_key.lower(), bytes_value)) 

163 

164 self._encoding = encoding 

165 

166 @property 

167 def encoding(self) -> str: 

168 """ 

169 Header encoding is mandated as ascii, but we allow fallbacks to utf-8 

170 or iso-8859-1. 

171 """ 

172 if self._encoding is None: 

173 for encoding in ["ascii", "utf-8"]: 

174 for key, value in self.raw: 

175 try: 

176 key.decode(encoding) 

177 value.decode(encoding) 

178 except UnicodeDecodeError: 

179 break 

180 else: 

181 # The else block runs if 'break' did not occur, meaning 

182 # all values fitted the encoding. 

183 self._encoding = encoding 

184 break 

185 else: 

186 # The ISO-8859-1 encoding covers all 256 code points in a byte, 

187 # so will never raise decode errors. 

188 self._encoding = "iso-8859-1" 

189 return self._encoding 

190 

191 @encoding.setter 

192 def encoding(self, value: str) -> None: 

193 self._encoding = value 

194 

195 @property 

196 def raw(self) -> list[tuple[bytes, bytes]]: 

197 """ 

198 Returns a list of the raw header items, as byte pairs. 

199 """ 

200 return [(raw_key, value) for raw_key, _, value in self._list] 

201 

202 def keys(self) -> typing.KeysView[str]: 

203 return {key.decode(self.encoding): None for _, key, value in self._list}.keys() 

204 

205 def values(self) -> typing.ValuesView[str]: 

206 values_dict: dict[str, str] = {} 

207 for _, key, value in self._list: 

208 str_key = key.decode(self.encoding) 

209 str_value = value.decode(self.encoding) 

210 if str_key in values_dict: 

211 values_dict[str_key] += f", {str_value}" 

212 else: 

213 values_dict[str_key] = str_value 

214 return values_dict.values() 

215 

216 def items(self) -> typing.ItemsView[str, str]: 

217 """ 

218 Return `(key, value)` items of headers. Concatenate headers 

219 into a single comma separated value when a key occurs multiple times. 

220 """ 

221 values_dict: dict[str, str] = {} 

222 for _, key, value in self._list: 

223 str_key = key.decode(self.encoding) 

224 str_value = value.decode(self.encoding) 

225 if str_key in values_dict: 

226 values_dict[str_key] += f", {str_value}" 

227 else: 

228 values_dict[str_key] = str_value 

229 return values_dict.items() 

230 

231 def multi_items(self) -> list[tuple[str, str]]: 

232 """ 

233 Return a list of `(key, value)` pairs of headers. Allow multiple 

234 occurrences of the same key without concatenating into a single 

235 comma separated value. 

236 """ 

237 return [ 

238 (key.decode(self.encoding), value.decode(self.encoding)) 

239 for _, key, value in self._list 

240 ] 

241 

242 def get(self, key: str, default: typing.Any = None) -> typing.Any: 

243 """ 

244 Return a header value. If multiple occurrences of the header occur 

245 then concatenate them together with commas. 

246 """ 

247 try: 

248 return self[key] 

249 except KeyError: 

250 return default 

251 

252 def get_list(self, key: str, split_commas: bool = False) -> list[str]: 

253 """ 

254 Return a list of all header values for a given key. 

255 If `split_commas=True` is passed, then any comma separated header 

256 values are split into multiple return strings. 

257 """ 

258 get_header_key = key.lower().encode(self.encoding) 

259 

260 values = [ 

261 item_value.decode(self.encoding) 

262 for _, item_key, item_value in self._list 

263 if item_key.lower() == get_header_key 

264 ] 

265 

266 if not split_commas: 

267 return values 

268 

269 split_values = [] 

270 for value in values: 

271 split_values.extend([item.strip() for item in value.split(",")]) 

272 return split_values 

273 

274 def update(self, headers: HeaderTypes | None = None) -> None: # type: ignore 

275 headers = Headers(headers) 

276 for key in headers.keys(): 

277 if key in self: 

278 self.pop(key) 

279 self._list.extend(headers._list) 

280 

281 def copy(self) -> Headers: 

282 return Headers(self, encoding=self.encoding) 

283 

284 def __getitem__(self, key: str) -> str: 

285 """ 

286 Return a single header value. 

287 

288 If there are multiple headers with the same key, then we concatenate 

289 them with commas. See: https://tools.ietf.org/html/rfc7230#section-3.2.2 

290 """ 

291 normalized_key = key.lower().encode(self.encoding) 

292 

293 items = [ 

294 header_value.decode(self.encoding) 

295 for _, header_key, header_value in self._list 

296 if header_key == normalized_key 

297 ] 

298 

299 if items: 

300 return ", ".join(items) 

301 

302 raise KeyError(key) 

303 

304 def __setitem__(self, key: str, value: str) -> None: 

305 """ 

306 Set the header `key` to `value`, removing any duplicate entries. 

307 Retains insertion order. 

308 """ 

309 set_key = key.encode(self._encoding or "utf-8") 

310 set_value = value.encode(self._encoding or "utf-8") 

311 lookup_key = set_key.lower() 

312 

313 found_indexes = [ 

314 idx 

315 for idx, (_, item_key, _) in enumerate(self._list) 

316 if item_key == lookup_key 

317 ] 

318 

319 for idx in reversed(found_indexes[1:]): 

320 del self._list[idx] 

321 

322 if found_indexes: 

323 idx = found_indexes[0] 

324 self._list[idx] = (set_key, lookup_key, set_value) 

325 else: 

326 self._list.append((set_key, lookup_key, set_value)) 

327 

328 def __delitem__(self, key: str) -> None: 

329 """ 

330 Remove the header `key`. 

331 """ 

332 del_key = key.lower().encode(self.encoding) 

333 

334 pop_indexes = [ 

335 idx 

336 for idx, (_, item_key, _) in enumerate(self._list) 

337 if item_key.lower() == del_key 

338 ] 

339 

340 if not pop_indexes: 

341 raise KeyError(key) 

342 

343 for idx in reversed(pop_indexes): 

344 del self._list[idx] 

345 

346 def __contains__(self, key: typing.Any) -> bool: 

347 header_key = key.lower().encode(self.encoding) 

348 return header_key in [key for _, key, _ in self._list] 

349 

350 def __iter__(self) -> typing.Iterator[typing.Any]: 

351 return iter(self.keys()) 

352 

353 def __len__(self) -> int: 

354 return len(self._list) 

355 

356 def __eq__(self, other: typing.Any) -> bool: 

357 try: 

358 other_headers = Headers(other) 

359 except ValueError: 

360 return False 

361 

362 self_list = [(key, value) for _, key, value in self._list] 

363 other_list = [(key, value) for _, key, value in other_headers._list] 

364 return sorted(self_list) == sorted(other_list) 

365 

366 def __repr__(self) -> str: 

367 class_name = self.__class__.__name__ 

368 

369 encoding_str = "" 

370 if self.encoding != "ascii": 

371 encoding_str = f", encoding={self.encoding!r}" 

372 

373 as_list = list(_obfuscate_sensitive_headers(self.multi_items())) 

374 as_dict = dict(as_list) 

375 

376 no_duplicate_keys = len(as_dict) == len(as_list) 

377 if no_duplicate_keys: 

378 return f"{class_name}({as_dict!r}{encoding_str})" 

379 return f"{class_name}({as_list!r}{encoding_str})" 

380 

381 

382class Request: 

383 def __init__( 

384 self, 

385 method: str, 

386 url: URL | str, 

387 *, 

388 params: QueryParamTypes | None = None, 

389 headers: HeaderTypes | None = None, 

390 cookies: CookieTypes | None = None, 

391 content: RequestContent | None = None, 

392 data: RequestData | None = None, 

393 files: RequestFiles | None = None, 

394 json: typing.Any | None = None, 

395 stream: SyncByteStream | AsyncByteStream | None = None, 

396 extensions: RequestExtensions | None = None, 

397 ) -> None: 

398 self.method = method.upper() 

399 self.url = URL(url) if params is None else URL(url, params=params) 

400 self.headers = Headers(headers) 

401 self.extensions = {} if extensions is None else dict(extensions) 

402 

403 if cookies: 

404 Cookies(cookies).set_cookie_header(self) 

405 

406 if stream is None: 

407 content_type: str | None = self.headers.get("content-type") 

408 headers, stream = encode_request( 

409 content=content, 

410 data=data, 

411 files=files, 

412 json=json, 

413 boundary=get_multipart_boundary_from_content_type( 

414 content_type=content_type.encode(self.headers.encoding) 

415 if content_type 

416 else None 

417 ), 

418 ) 

419 self._prepare(headers) 

420 self.stream = stream 

421 # Load the request body, except for streaming content. 

422 if isinstance(stream, ByteStream): 

423 self.read() 

424 else: 

425 # There's an important distinction between `Request(content=...)`, 

426 # and `Request(stream=...)`. 

427 # 

428 # Using `content=...` implies automatically populated `Host` and content 

429 # headers, of either `Content-Length: ...` or `Transfer-Encoding: chunked`. 

430 # 

431 # Using `stream=...` will not automatically include *any* 

432 # auto-populated headers. 

433 # 

434 # As an end-user you don't really need `stream=...`. It's only 

435 # useful when: 

436 # 

437 # * Preserving the request stream when copying requests, eg for redirects. 

438 # * Creating request instances on the *server-side* of the transport API. 

439 self.stream = stream 

440 

441 def _prepare(self, default_headers: dict[str, str]) -> None: 

442 for key, value in default_headers.items(): 

443 # Ignore Transfer-Encoding if the Content-Length has been set explicitly. 

444 if key.lower() == "transfer-encoding" and "Content-Length" in self.headers: 

445 continue 

446 self.headers.setdefault(key, value) 

447 

448 auto_headers: list[tuple[bytes, bytes]] = [] 

449 

450 has_host = "Host" in self.headers 

451 has_content_length = ( 

452 "Content-Length" in self.headers or "Transfer-Encoding" in self.headers 

453 ) 

454 

455 if not has_host and self.url.host: 

456 auto_headers.append((b"Host", self.url.netloc)) 

457 if not has_content_length and self.method in ("POST", "PUT", "PATCH"): 

458 auto_headers.append((b"Content-Length", b"0")) 

459 

460 self.headers = Headers(auto_headers + self.headers.raw) 

461 

462 @property 

463 def content(self) -> bytes: 

464 if not hasattr(self, "_content"): 

465 raise RequestNotRead() 

466 return self._content 

467 

468 def read(self) -> bytes: 

469 """ 

470 Read and return the request content. 

471 """ 

472 if not hasattr(self, "_content"): 

473 assert isinstance(self.stream, typing.Iterable) 

474 self._content = b"".join(self.stream) 

475 if not isinstance(self.stream, ByteStream): 

476 # If a streaming request has been read entirely into memory, then 

477 # we can replace the stream with a raw bytes implementation, 

478 # to ensure that any non-replayable streams can still be used. 

479 self.stream = ByteStream(self._content) 

480 return self._content 

481 

482 async def aread(self) -> bytes: 

483 """ 

484 Read and return the request content. 

485 """ 

486 if not hasattr(self, "_content"): 

487 assert isinstance(self.stream, typing.AsyncIterable) 

488 self._content = b"".join([part async for part in self.stream]) 

489 if not isinstance(self.stream, ByteStream): 

490 # If a streaming request has been read entirely into memory, then 

491 # we can replace the stream with a raw bytes implementation, 

492 # to ensure that any non-replayable streams can still be used. 

493 self.stream = ByteStream(self._content) 

494 return self._content 

495 

496 def __repr__(self) -> str: 

497 class_name = self.__class__.__name__ 

498 url = str(self.url) 

499 return f"<{class_name}({self.method!r}, {url!r})>" 

500 

501 def __getstate__(self) -> dict[str, typing.Any]: 

502 return { 

503 name: value 

504 for name, value in self.__dict__.items() 

505 if name not in ["extensions", "stream"] 

506 } 

507 

508 def __setstate__(self, state: dict[str, typing.Any]) -> None: 

509 for name, value in state.items(): 

510 setattr(self, name, value) 

511 self.extensions = {} 

512 self.stream = UnattachedStream() 

513 

514 

515class Response: 

516 def __init__( 

517 self, 

518 status_code: int, 

519 *, 

520 headers: HeaderTypes | None = None, 

521 content: ResponseContent | None = None, 

522 text: str | None = None, 

523 html: str | None = None, 

524 json: typing.Any = None, 

525 stream: SyncByteStream | AsyncByteStream | None = None, 

526 request: Request | None = None, 

527 extensions: ResponseExtensions | None = None, 

528 history: list[Response] | None = None, 

529 default_encoding: str | typing.Callable[[bytes], str] = "utf-8", 

530 ) -> None: 

531 self.status_code = status_code 

532 self.headers = Headers(headers) 

533 

534 self._request: Request | None = request 

535 

536 # When follow_redirects=False and a redirect is received, 

537 # the client will set `response.next_request`. 

538 self.next_request: Request | None = None 

539 

540 self.extensions = {} if extensions is None else dict(extensions) 

541 self.history = [] if history is None else list(history) 

542 

543 self.is_closed = False 

544 self.is_stream_consumed = False 

545 

546 self.default_encoding = default_encoding 

547 

548 if stream is None: 

549 headers, stream = encode_response(content, text, html, json) 

550 self._prepare(headers) 

551 self.stream = stream 

552 if isinstance(stream, ByteStream): 

553 # Load the response body, except for streaming content. 

554 self.read() 

555 else: 

556 # There's an important distinction between `Response(content=...)`, 

557 # and `Response(stream=...)`. 

558 # 

559 # Using `content=...` implies automatically populated content headers, 

560 # of either `Content-Length: ...` or `Transfer-Encoding: chunked`. 

561 # 

562 # Using `stream=...` will not automatically include any content headers. 

563 # 

564 # As an end-user you don't really need `stream=...`. It's only 

565 # useful when creating response instances having received a stream 

566 # from the transport API. 

567 self.stream = stream 

568 

569 self._num_bytes_downloaded = 0 

570 

571 def _prepare(self, default_headers: dict[str, str]) -> None: 

572 for key, value in default_headers.items(): 

573 # Ignore Transfer-Encoding if the Content-Length has been set explicitly. 

574 if key.lower() == "transfer-encoding" and "content-length" in self.headers: 

575 continue 

576 self.headers.setdefault(key, value) 

577 

578 @property 

579 def elapsed(self) -> datetime.timedelta: 

580 """ 

581 Returns the time taken for the complete request/response 

582 cycle to complete. 

583 """ 

584 if not hasattr(self, "_elapsed"): 

585 raise RuntimeError( 

586 "'.elapsed' may only be accessed after the response " 

587 "has been read or closed." 

588 ) 

589 return self._elapsed 

590 

591 @elapsed.setter 

592 def elapsed(self, elapsed: datetime.timedelta) -> None: 

593 self._elapsed = elapsed 

594 

595 @property 

596 def request(self) -> Request: 

597 """ 

598 Returns the request instance associated to the current response. 

599 """ 

600 if self._request is None: 

601 raise RuntimeError( 

602 "The request instance has not been set on this response." 

603 ) 

604 return self._request 

605 

606 @request.setter 

607 def request(self, value: Request) -> None: 

608 self._request = value 

609 

610 @property 

611 def http_version(self) -> str: 

612 try: 

613 http_version: bytes = self.extensions["http_version"] 

614 except KeyError: 

615 return "HTTP/1.1" 

616 else: 

617 return http_version.decode("ascii", errors="ignore") 

618 

619 @property 

620 def reason_phrase(self) -> str: 

621 try: 

622 reason_phrase: bytes = self.extensions["reason_phrase"] 

623 except KeyError: 

624 return codes.get_reason_phrase(self.status_code) 

625 else: 

626 return reason_phrase.decode("ascii", errors="ignore") 

627 

628 @property 

629 def url(self) -> URL: 

630 """ 

631 Returns the URL for which the request was made. 

632 """ 

633 return self.request.url 

634 

635 @property 

636 def content(self) -> bytes: 

637 if not hasattr(self, "_content"): 

638 raise ResponseNotRead() 

639 return self._content 

640 

641 @property 

642 def text(self) -> str: 

643 if not hasattr(self, "_text"): 

644 content = self.content 

645 if not content: 

646 self._text = "" 

647 else: 

648 decoder = TextDecoder(encoding=self.encoding or "utf-8") 

649 self._text = "".join([decoder.decode(self.content), decoder.flush()]) 

650 return self._text 

651 

652 @property 

653 def encoding(self) -> str | None: 

654 """ 

655 Return an encoding to use for decoding the byte content into text. 

656 The priority for determining this is given by... 

657 

658 * `.encoding = <>` has been set explicitly. 

659 * The encoding as specified by the charset parameter in the Content-Type header. 

660 * The encoding as determined by `default_encoding`, which may either be 

661 a string like "utf-8" indicating the encoding to use, or may be a callable 

662 which enables charset autodetection. 

663 """ 

664 if not hasattr(self, "_encoding"): 

665 encoding = self.charset_encoding 

666 if encoding is None or not _is_known_encoding(encoding): 

667 if isinstance(self.default_encoding, str): 

668 encoding = self.default_encoding 

669 elif hasattr(self, "_content"): 

670 encoding = self.default_encoding(self._content) 

671 self._encoding = encoding or "utf-8" 

672 return self._encoding 

673 

674 @encoding.setter 

675 def encoding(self, value: str) -> None: 

676 """ 

677 Set the encoding to use for decoding the byte content into text. 

678 

679 If the `text` attribute has been accessed, attempting to set the 

680 encoding will throw a ValueError. 

681 """ 

682 if hasattr(self, "_text"): 

683 raise ValueError( 

684 "Setting encoding after `text` has been accessed is not allowed." 

685 ) 

686 self._encoding = value 

687 

688 @property 

689 def charset_encoding(self) -> str | None: 

690 """ 

691 Return the encoding, as specified by the Content-Type header. 

692 """ 

693 content_type = self.headers.get("Content-Type") 

694 if content_type is None: 

695 return None 

696 

697 return _parse_content_type_charset(content_type) 

698 

699 def _get_content_decoder(self) -> ContentDecoder: 

700 """ 

701 Returns a decoder instance which can be used to decode the raw byte 

702 content, depending on the Content-Encoding used in the response. 

703 """ 

704 if not hasattr(self, "_decoder"): 

705 decoders: list[ContentDecoder] = [] 

706 values = self.headers.get_list("content-encoding", split_commas=True) 

707 for value in values: 

708 value = value.strip().lower() 

709 try: 

710 decoder_cls = SUPPORTED_DECODERS[value] 

711 decoders.append(decoder_cls()) 

712 except KeyError: 

713 continue 

714 

715 if len(decoders) == 1: 

716 self._decoder = decoders[0] 

717 elif len(decoders) > 1: 

718 self._decoder = MultiDecoder(children=decoders) 

719 else: 

720 self._decoder = IdentityDecoder() 

721 

722 return self._decoder 

723 

724 @property 

725 def is_informational(self) -> bool: 

726 """ 

727 A property which is `True` for 1xx status codes, `False` otherwise. 

728 """ 

729 return codes.is_informational(self.status_code) 

730 

731 @property 

732 def is_success(self) -> bool: 

733 """ 

734 A property which is `True` for 2xx status codes, `False` otherwise. 

735 """ 

736 return codes.is_success(self.status_code) 

737 

738 @property 

739 def is_redirect(self) -> bool: 

740 """ 

741 A property which is `True` for 3xx status codes, `False` otherwise. 

742 

743 Note that not all responses with a 3xx status code indicate a URL redirect. 

744 

745 Use `response.has_redirect_location` to determine responses with a properly 

746 formed URL redirection. 

747 """ 

748 return codes.is_redirect(self.status_code) 

749 

750 @property 

751 def is_client_error(self) -> bool: 

752 """ 

753 A property which is `True` for 4xx status codes, `False` otherwise. 

754 """ 

755 return codes.is_client_error(self.status_code) 

756 

757 @property 

758 def is_server_error(self) -> bool: 

759 """ 

760 A property which is `True` for 5xx status codes, `False` otherwise. 

761 """ 

762 return codes.is_server_error(self.status_code) 

763 

764 @property 

765 def is_error(self) -> bool: 

766 """ 

767 A property which is `True` for 4xx and 5xx status codes, `False` otherwise. 

768 """ 

769 return codes.is_error(self.status_code) 

770 

771 @property 

772 def has_redirect_location(self) -> bool: 

773 """ 

774 Returns True for 3xx responses with a properly formed URL redirection, 

775 `False` otherwise. 

776 """ 

777 return ( 

778 self.status_code 

779 in ( 

780 # 301 (Cacheable redirect. Method may change to GET.) 

781 codes.MOVED_PERMANENTLY, 

782 # 302 (Uncacheable redirect. Method may change to GET.) 

783 codes.FOUND, 

784 # 303 (Client should make a GET or HEAD request.) 

785 codes.SEE_OTHER, 

786 # 307 (Equiv. 302, but retain method) 

787 codes.TEMPORARY_REDIRECT, 

788 # 308 (Equiv. 301, but retain method) 

789 codes.PERMANENT_REDIRECT, 

790 ) 

791 and "Location" in self.headers 

792 ) 

793 

794 def raise_for_status(self) -> Response: 

795 """ 

796 Raise the `HTTPStatusError` if one occurred. 

797 """ 

798 request = self._request 

799 if request is None: 

800 raise RuntimeError( 

801 "Cannot call `raise_for_status` as the request " 

802 "instance has not been set on this response." 

803 ) 

804 

805 if self.is_success: 

806 return self 

807 

808 if self.has_redirect_location: 

809 message = ( 

810 "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n" 

811 "Redirect location: '{0.headers[location]}'\n" 

812 "For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/{0.status_code}" 

813 ) 

814 else: 

815 message = ( 

816 "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n" 

817 "For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/{0.status_code}" 

818 ) 

819 

820 status_class = self.status_code // 100 

821 error_types = { 

822 1: "Informational response", 

823 3: "Redirect response", 

824 4: "Client error", 

825 5: "Server error", 

826 } 

827 error_type = error_types.get(status_class, "Invalid status code") 

828 message = message.format(self, error_type=error_type) 

829 raise HTTPStatusError(message, request=request, response=self) 

830 

831 def json(self, **kwargs: typing.Any) -> typing.Any: 

832 return jsonlib.loads(self.content, **kwargs) 

833 

834 @property 

835 def cookies(self) -> Cookies: 

836 if not hasattr(self, "_cookies"): 

837 self._cookies = Cookies() 

838 self._cookies.extract_cookies(self) 

839 return self._cookies 

840 

841 @property 

842 def links(self) -> dict[str | None, dict[str, str]]: 

843 """ 

844 Returns the parsed header links of the response, if any 

845 """ 

846 header = self.headers.get("link") 

847 if header is None: 

848 return {} 

849 

850 return { 

851 (link.get("rel") or link.get("url")): link 

852 for link in _parse_header_links(header) 

853 } 

854 

855 @property 

856 def num_bytes_downloaded(self) -> int: 

857 return self._num_bytes_downloaded 

858 

859 def __repr__(self) -> str: 

860 return f"<Response [{self.status_code} {self.reason_phrase}]>" 

861 

862 def __getstate__(self) -> dict[str, typing.Any]: 

863 return { 

864 name: value 

865 for name, value in self.__dict__.items() 

866 if name not in ["extensions", "stream", "is_closed", "_decoder"] 

867 } 

868 

869 def __setstate__(self, state: dict[str, typing.Any]) -> None: 

870 for name, value in state.items(): 

871 setattr(self, name, value) 

872 self.is_closed = True 

873 self.extensions = {} 

874 self.stream = UnattachedStream() 

875 

876 def read(self) -> bytes: 

877 """ 

878 Read and return the response content. 

879 """ 

880 if not hasattr(self, "_content"): 

881 self._content = b"".join(self.iter_bytes()) 

882 return self._content 

883 

884 def iter_bytes(self, chunk_size: int | None = None) -> typing.Iterator[bytes]: 

885 """ 

886 A byte-iterator over the decoded response content. 

887 This allows us to handle gzip, deflate, brotli, and zstd encoded responses. 

888 """ 

889 if hasattr(self, "_content"): 

890 chunk_size = len(self._content) if chunk_size is None else chunk_size 

891 for i in range(0, len(self._content), max(chunk_size, 1)): 

892 yield self._content[i : i + chunk_size] 

893 else: 

894 decoder = self._get_content_decoder() 

895 chunker = ByteChunker(chunk_size=chunk_size) 

896 with request_context(request=self._request): 

897 for raw_bytes in self.iter_raw(): 

898 decoded = decoder.decode(raw_bytes) 

899 for chunk in chunker.decode(decoded): 

900 yield chunk 

901 decoded = decoder.flush() 

902 for chunk in chunker.decode(decoded): 

903 yield chunk # pragma: no cover 

904 for chunk in chunker.flush(): 

905 yield chunk 

906 

907 def iter_text(self, chunk_size: int | None = None) -> typing.Iterator[str]: 

908 """ 

909 A str-iterator over the decoded response content 

910 that handles both gzip, deflate, etc but also detects the content's 

911 string encoding. 

912 """ 

913 decoder = TextDecoder(encoding=self.encoding or "utf-8") 

914 chunker = TextChunker(chunk_size=chunk_size) 

915 with request_context(request=self._request): 

916 for byte_content in self.iter_bytes(): 

917 text_content = decoder.decode(byte_content) 

918 for chunk in chunker.decode(text_content): 

919 yield chunk 

920 text_content = decoder.flush() 

921 for chunk in chunker.decode(text_content): 

922 yield chunk # pragma: no cover 

923 for chunk in chunker.flush(): 

924 yield chunk 

925 

926 def iter_lines(self) -> typing.Iterator[str]: 

927 decoder = LineDecoder() 

928 with request_context(request=self._request): 

929 for text in self.iter_text(): 

930 for line in decoder.decode(text): 

931 yield line 

932 for line in decoder.flush(): 

933 yield line 

934 

935 def iter_raw(self, chunk_size: int | None = None) -> typing.Iterator[bytes]: 

936 """ 

937 A byte-iterator over the raw response content. 

938 """ 

939 if self.is_stream_consumed: 

940 raise StreamConsumed() 

941 if self.is_closed: 

942 raise StreamClosed() 

943 if not isinstance(self.stream, SyncByteStream): 

944 raise RuntimeError("Attempted to call a sync iterator on an async stream.") 

945 

946 self.is_stream_consumed = True 

947 self._num_bytes_downloaded = 0 

948 chunker = ByteChunker(chunk_size=chunk_size) 

949 

950 with request_context(request=self._request): 

951 for raw_stream_bytes in self.stream: 

952 self._num_bytes_downloaded += len(raw_stream_bytes) 

953 for chunk in chunker.decode(raw_stream_bytes): 

954 yield chunk 

955 

956 for chunk in chunker.flush(): 

957 yield chunk 

958 

959 self.close() 

960 

961 def close(self) -> None: 

962 """ 

963 Close the response and release the connection. 

964 Automatically called if the response body is read to completion. 

965 """ 

966 if not isinstance(self.stream, SyncByteStream): 

967 raise RuntimeError("Attempted to call an sync close on an async stream.") 

968 

969 if not self.is_closed: 

970 self.is_closed = True 

971 with request_context(request=self._request): 

972 self.stream.close() 

973 

974 async def aread(self) -> bytes: 

975 """ 

976 Read and return the response content. 

977 """ 

978 if not hasattr(self, "_content"): 

979 self._content = b"".join([part async for part in self.aiter_bytes()]) 

980 return self._content 

981 

982 async def aiter_bytes( 

983 self, chunk_size: int | None = None 

984 ) -> typing.AsyncIterator[bytes]: 

985 """ 

986 A byte-iterator over the decoded response content. 

987 This allows us to handle gzip, deflate, brotli, and zstd encoded responses. 

988 """ 

989 if hasattr(self, "_content"): 

990 chunk_size = len(self._content) if chunk_size is None else chunk_size 

991 for i in range(0, len(self._content), max(chunk_size, 1)): 

992 yield self._content[i : i + chunk_size] 

993 else: 

994 decoder = self._get_content_decoder() 

995 chunker = ByteChunker(chunk_size=chunk_size) 

996 with request_context(request=self._request): 

997 async for raw_bytes in self.aiter_raw(): 

998 decoded = decoder.decode(raw_bytes) 

999 for chunk in chunker.decode(decoded): 

1000 yield chunk 

1001 decoded = decoder.flush() 

1002 for chunk in chunker.decode(decoded): 

1003 yield chunk # pragma: no cover 

1004 for chunk in chunker.flush(): 

1005 yield chunk 

1006 

1007 async def aiter_text( 

1008 self, chunk_size: int | None = None 

1009 ) -> typing.AsyncIterator[str]: 

1010 """ 

1011 A str-iterator over the decoded response content 

1012 that handles both gzip, deflate, etc but also detects the content's 

1013 string encoding. 

1014 """ 

1015 decoder = TextDecoder(encoding=self.encoding or "utf-8") 

1016 chunker = TextChunker(chunk_size=chunk_size) 

1017 with request_context(request=self._request): 

1018 async for byte_content in self.aiter_bytes(): 

1019 text_content = decoder.decode(byte_content) 

1020 for chunk in chunker.decode(text_content): 

1021 yield chunk 

1022 text_content = decoder.flush() 

1023 for chunk in chunker.decode(text_content): 

1024 yield chunk # pragma: no cover 

1025 for chunk in chunker.flush(): 

1026 yield chunk 

1027 

1028 async def aiter_lines(self) -> typing.AsyncIterator[str]: 

1029 decoder = LineDecoder() 

1030 with request_context(request=self._request): 

1031 async for text in self.aiter_text(): 

1032 for line in decoder.decode(text): 

1033 yield line 

1034 for line in decoder.flush(): 

1035 yield line 

1036 

1037 async def aiter_raw( 

1038 self, chunk_size: int | None = None 

1039 ) -> typing.AsyncIterator[bytes]: 

1040 """ 

1041 A byte-iterator over the raw response content. 

1042 """ 

1043 if self.is_stream_consumed: 

1044 raise StreamConsumed() 

1045 if self.is_closed: 

1046 raise StreamClosed() 

1047 if not isinstance(self.stream, AsyncByteStream): 

1048 raise RuntimeError("Attempted to call an async iterator on an sync stream.") 

1049 

1050 self.is_stream_consumed = True 

1051 self._num_bytes_downloaded = 0 

1052 chunker = ByteChunker(chunk_size=chunk_size) 

1053 

1054 with request_context(request=self._request): 

1055 async for raw_stream_bytes in self.stream: 

1056 self._num_bytes_downloaded += len(raw_stream_bytes) 

1057 for chunk in chunker.decode(raw_stream_bytes): 

1058 yield chunk 

1059 

1060 for chunk in chunker.flush(): 

1061 yield chunk 

1062 

1063 await self.aclose() 

1064 

1065 async def aclose(self) -> None: 

1066 """ 

1067 Close the response and release the connection. 

1068 Automatically called if the response body is read to completion. 

1069 """ 

1070 if not isinstance(self.stream, AsyncByteStream): 

1071 raise RuntimeError("Attempted to call an async close on an sync stream.") 

1072 

1073 if not self.is_closed: 

1074 self.is_closed = True 

1075 with request_context(request=self._request): 

1076 await self.stream.aclose() 

1077 

1078 

1079class Cookies(typing.MutableMapping[str, str]): 

1080 """ 

1081 HTTP Cookies, as a mutable mapping. 

1082 """ 

1083 

1084 def __init__(self, cookies: CookieTypes | None = None) -> None: 

1085 if cookies is None or isinstance(cookies, dict): 

1086 self.jar = CookieJar() 

1087 if isinstance(cookies, dict): 

1088 for key, value in cookies.items(): 

1089 self.set(key, value) 

1090 elif isinstance(cookies, list): 

1091 self.jar = CookieJar() 

1092 for key, value in cookies: 

1093 self.set(key, value) 

1094 elif isinstance(cookies, Cookies): 

1095 self.jar = CookieJar() 

1096 for cookie in cookies.jar: 

1097 self.jar.set_cookie(cookie) 

1098 else: 

1099 self.jar = cookies 

1100 

1101 def extract_cookies(self, response: Response) -> None: 

1102 """ 

1103 Loads any cookies based on the response `Set-Cookie` headers. 

1104 """ 

1105 urllib_response = self._CookieCompatResponse(response) 

1106 urllib_request = self._CookieCompatRequest(response.request) 

1107 

1108 self.jar.extract_cookies(urllib_response, urllib_request) # type: ignore 

1109 

1110 def set_cookie_header(self, request: Request) -> None: 

1111 """ 

1112 Sets an appropriate 'Cookie:' HTTP header on the `Request`. 

1113 """ 

1114 urllib_request = self._CookieCompatRequest(request) 

1115 self.jar.add_cookie_header(urllib_request) 

1116 

1117 def set(self, name: str, value: str, domain: str = "", path: str = "/") -> None: 

1118 """ 

1119 Set a cookie value by name. May optionally include domain and path. 

1120 """ 

1121 kwargs = { 

1122 "version": 0, 

1123 "name": name, 

1124 "value": value, 

1125 "port": None, 

1126 "port_specified": False, 

1127 "domain": domain, 

1128 "domain_specified": bool(domain), 

1129 "domain_initial_dot": domain.startswith("."), 

1130 "path": path, 

1131 "path_specified": bool(path), 

1132 "secure": False, 

1133 "expires": None, 

1134 "discard": True, 

1135 "comment": None, 

1136 "comment_url": None, 

1137 "rest": {"HttpOnly": None}, 

1138 "rfc2109": False, 

1139 } 

1140 cookie = Cookie(**kwargs) # type: ignore 

1141 self.jar.set_cookie(cookie) 

1142 

1143 def get( # type: ignore 

1144 self, 

1145 name: str, 

1146 default: str | None = None, 

1147 domain: str | None = None, 

1148 path: str | None = None, 

1149 ) -> str | None: 

1150 """ 

1151 Get a cookie by name. May optionally include domain and path 

1152 in order to specify exactly which cookie to retrieve. 

1153 """ 

1154 value = None 

1155 for cookie in self.jar: 

1156 if cookie.name == name: 

1157 if domain is None or cookie.domain == domain: 

1158 if path is None or cookie.path == path: 

1159 if value is not None: 

1160 message = f"Multiple cookies exist with name={name}" 

1161 raise CookieConflict(message) 

1162 value = cookie.value 

1163 

1164 if value is None: 

1165 return default 

1166 return value 

1167 

1168 def delete( 

1169 self, 

1170 name: str, 

1171 domain: str | None = None, 

1172 path: str | None = None, 

1173 ) -> None: 

1174 """ 

1175 Delete a cookie by name. May optionally include domain and path 

1176 in order to specify exactly which cookie to delete. 

1177 """ 

1178 if domain is not None and path is not None: 

1179 return self.jar.clear(domain, path, name) 

1180 

1181 remove = [ 

1182 cookie 

1183 for cookie in self.jar 

1184 if cookie.name == name 

1185 and (domain is None or cookie.domain == domain) 

1186 and (path is None or cookie.path == path) 

1187 ] 

1188 

1189 for cookie in remove: 

1190 self.jar.clear(cookie.domain, cookie.path, cookie.name) 

1191 

1192 def clear(self, domain: str | None = None, path: str | None = None) -> None: 

1193 """ 

1194 Delete all cookies. Optionally include a domain and path in 

1195 order to only delete a subset of all the cookies. 

1196 """ 

1197 args = [] 

1198 if domain is not None: 

1199 args.append(domain) 

1200 if path is not None: 

1201 assert domain is not None 

1202 args.append(path) 

1203 self.jar.clear(*args) 

1204 

1205 def update(self, cookies: CookieTypes | None = None) -> None: # type: ignore 

1206 cookies = Cookies(cookies) 

1207 for cookie in cookies.jar: 

1208 self.jar.set_cookie(cookie) 

1209 

1210 def __setitem__(self, name: str, value: str) -> None: 

1211 return self.set(name, value) 

1212 

1213 def __getitem__(self, name: str) -> str: 

1214 value = self.get(name) 

1215 if value is None: 

1216 raise KeyError(name) 

1217 return value 

1218 

1219 def __delitem__(self, name: str) -> None: 

1220 return self.delete(name) 

1221 

1222 def __len__(self) -> int: 

1223 return len(self.jar) 

1224 

1225 def __iter__(self) -> typing.Iterator[str]: 

1226 return (cookie.name for cookie in self.jar) 

1227 

1228 def __bool__(self) -> bool: 

1229 for _ in self.jar: 

1230 return True 

1231 return False 

1232 

1233 def __repr__(self) -> str: 

1234 cookies_repr = ", ".join( 

1235 [ 

1236 f"<Cookie {cookie.name}={cookie.value} for {cookie.domain} />" 

1237 for cookie in self.jar 

1238 ] 

1239 ) 

1240 

1241 return f"<Cookies[{cookies_repr}]>" 

1242 

1243 class _CookieCompatRequest(urllib.request.Request): 

1244 """ 

1245 Wraps a `Request` instance up in a compatibility interface suitable 

1246 for use with `CookieJar` operations. 

1247 """ 

1248 

1249 def __init__(self, request: Request) -> None: 

1250 super().__init__( 

1251 url=str(request.url), 

1252 headers=dict(request.headers), 

1253 method=request.method, 

1254 ) 

1255 self.request = request 

1256 

1257 def add_unredirected_header(self, key: str, value: str) -> None: 

1258 super().add_unredirected_header(key, value) 

1259 self.request.headers[key] = value 

1260 

1261 class _CookieCompatResponse: 

1262 """ 

1263 Wraps a `Request` instance up in a compatibility interface suitable 

1264 for use with `CookieJar` operations. 

1265 """ 

1266 

1267 def __init__(self, response: Response) -> None: 

1268 self.response = response 

1269 

1270 def info(self) -> email.message.Message: 

1271 info = email.message.Message() 

1272 for key, value in self.response.headers.multi_items(): 

1273 # Note that setting `info[key]` here is an "append" operation, 

1274 # not a "replace" operation. 

1275 # https://docs.python.org/3/library/email.compat32-message.html#email.message.Message.__setitem__ 

1276 info[key] = value 

1277 return info